Skip to main content

rns_net/interface/
rnode.rs

1//! RNode LoRa radio interface.
2//!
3//! Manages serial connection to RNode device, detects firmware,
4//! configures radio parameters per subinterface. Supports single-radio
5//! (RNodeInterface) and multi-radio (RNodeMultiInterface) devices.
6//!
7//! Matches Python `RNodeInterface.py` and `RNodeMultiInterface.py`.
8
9use std::io::{self, Read, Write};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::InterfaceId;
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18use crate::rnode_kiss;
19use crate::serial::{Parity, SerialConfig, SerialPort};
20
21/// Validation limits matching Python RNodeInterface.
22pub const FREQ_MIN: u32 = 137_000_000;
23pub const FREQ_MAX: u32 = 3_000_000_000;
24pub const BW_MIN: u32 = 7_800;
25pub const BW_MAX: u32 = 1_625_000;
26pub const SF_MIN: u8 = 5;
27pub const SF_MAX: u8 = 12;
28pub const CR_MIN: u8 = 5;
29pub const CR_MAX: u8 = 8;
30pub const TXPOWER_MIN: i8 = 0;
31pub const TXPOWER_MAX: i8 = 37;
32pub const HW_MTU: u32 = 508;
33
34/// Configuration for one RNode subinterface (radio).
35#[derive(Debug, Clone)]
36pub struct RNodeSubConfig {
37    pub name: String,
38    pub frequency: u32,
39    pub bandwidth: u32,
40    pub txpower: i8,
41    pub spreading_factor: u8,
42    pub coding_rate: u8,
43    pub flow_control: bool,
44    pub st_alock: Option<f32>,
45    pub lt_alock: Option<f32>,
46}
47
48/// Configuration for the RNode device.
49#[derive(Debug, Clone)]
50pub struct RNodeConfig {
51    pub name: String,
52    pub port: String,
53    pub speed: u32,
54    pub subinterfaces: Vec<RNodeSubConfig>,
55    pub id_interval: Option<u32>,
56    pub id_callsign: Option<Vec<u8>>,
57    pub base_interface_id: InterfaceId,
58    /// Pre-opened file descriptor (e.g. from a USB bridge socketpair on Android).
59    /// When set, `start()` uses this fd directly instead of opening `port`.
60    pub pre_opened_fd: Option<i32>,
61}
62
63impl Default for RNodeConfig {
64    fn default() -> Self {
65        RNodeConfig {
66            name: String::new(),
67            port: String::new(),
68            speed: 115200,
69            subinterfaces: Vec::new(),
70            id_interval: None,
71            id_callsign: None,
72            base_interface_id: InterfaceId(0),
73            pre_opened_fd: None,
74        }
75    }
76}
77
78/// Validate subinterface configuration. Returns error message if invalid.
79pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
80    if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
81        return Some(format!(
82            "Invalid frequency {} for {}",
83            sub.frequency, sub.name
84        ));
85    }
86    if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
87        return Some(format!(
88            "Invalid bandwidth {} for {}",
89            sub.bandwidth, sub.name
90        ));
91    }
92    if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
93        return Some(format!(
94            "Invalid SF {} for {}",
95            sub.spreading_factor, sub.name
96        ));
97    }
98    if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
99        return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
100    }
101    if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
102        return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
103    }
104    if let Some(st) = sub.st_alock {
105        if st < 0.0 || st > 100.0 {
106            return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
107        }
108    }
109    if let Some(lt) = sub.lt_alock {
110        if lt < 0.0 || lt > 100.0 {
111            return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
112        }
113    }
114    None
115}
116
117/// Writer for a specific RNode subinterface.
118/// Wraps a shared serial writer with subinterface-specific data framing.
119struct RNodeSubWriter {
120    writer: Arc<Mutex<std::fs::File>>,
121    index: u8,
122    flow_control: bool,
123    flow_state: Arc<Mutex<SubFlowState>>,
124}
125
126struct SubFlowState {
127    ready: bool,
128    queue: std::collections::VecDeque<Vec<u8>>,
129}
130
131impl Writer for RNodeSubWriter {
132    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
133        let frame = rnode_kiss::rnode_data_frame(self.index, data);
134        if self.flow_control {
135            let mut state = self.flow_state.lock().unwrap();
136            if state.ready {
137                state.ready = false;
138                drop(state);
139                self.writer.lock().unwrap().write_all(&frame)
140            } else {
141                state.queue.push_back(data.to_vec());
142                Ok(())
143            }
144        } else {
145            self.writer.lock().unwrap().write_all(&frame)
146        }
147    }
148}
149
150/// Start the RNode interface.
151///
152/// Opens serial port, spawns reader thread which performs detect+configure,
153/// then enters data relay mode.
154///
155/// Returns one `(InterfaceId, Box<dyn Writer>)` per subinterface.
156pub fn start(
157    config: RNodeConfig,
158    tx: EventSender,
159) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
160    // Validate all subinterface configs upfront
161    for sub in &config.subinterfaces {
162        if let Some(err) = validate_sub_config(sub) {
163            return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
164        }
165    }
166
167    if config.subinterfaces.is_empty() {
168        return Err(io::Error::new(
169            io::ErrorKind::InvalidInput,
170            "No subinterfaces configured",
171        ));
172    }
173
174    let (reader_file, shared_writer) = if let Some(fd) = config.pre_opened_fd {
175        // Pre-opened fd from USB bridge — dup for independent reader/writer handles
176        let port = SerialPort::from_raw_fd(fd);
177        let r = port.reader()?;
178        let w = port.writer()?;
179        std::mem::forget(port); // don't close the original fd — bridge owns it
180        (r, Arc::new(Mutex::new(w)))
181    } else {
182        let serial_config = SerialConfig {
183            path: config.port.clone(),
184            baud: config.speed,
185            data_bits: 8,
186            parity: Parity::None,
187            stop_bits: 1,
188        };
189        let port = SerialPort::open(&serial_config)?;
190        let r = port.reader()?;
191        let w = port.writer()?;
192        (r, Arc::new(Mutex::new(w)))
193    };
194
195    // Build per-subinterface writers and IDs
196    let num_subs = config.subinterfaces.len();
197    let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
198    let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
199
200    for (i, sub) in config.subinterfaces.iter().enumerate() {
201        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
202        let flow_state = Arc::new(Mutex::new(SubFlowState {
203            ready: true,
204            queue: std::collections::VecDeque::new(),
205        }));
206        flow_states.push(flow_state.clone());
207
208        let sub_writer: Box<dyn Writer> = Box::new(RNodeSubWriter {
209            writer: shared_writer.clone(),
210            index: i as u8,
211            flow_control: sub.flow_control,
212            flow_state,
213        });
214        writers.push((sub_id, sub_writer));
215    }
216
217    // Spawn reader thread
218    let reader_shared_writer = shared_writer.clone();
219    let reader_config = config.clone();
220    let reader_flow_states = flow_states;
221    thread::Builder::new()
222        .name(format!("rnode-reader-{}", config.base_interface_id.0))
223        .spawn(move || {
224            reader_loop(
225                reader_file,
226                reader_shared_writer,
227                reader_config,
228                tx,
229                reader_flow_states,
230            );
231        })?;
232
233    // Spawn keepalive thread — sends periodic DETECT to prevent firmware
234    // bridge idle timeout (ESP32 RNode reverts to standalone after 30s idle).
235    let keepalive_writer = shared_writer.clone();
236    let keepalive_name = config.name.clone();
237    thread::Builder::new()
238        .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
239        .spawn(move || {
240            let detect = rnode_kiss::detect_request();
241            loop {
242                thread::sleep(Duration::from_secs(15));
243                if let Err(e) = keepalive_writer.lock().unwrap().write_all(&detect) {
244                    log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
245                    return;
246                }
247            }
248        })?;
249
250    Ok(writers)
251}
252
253/// Reader loop: detect device, configure radios, then relay data frames.
254fn reader_loop(
255    mut reader: std::fs::File,
256    writer: Arc<Mutex<std::fs::File>>,
257    config: RNodeConfig,
258    tx: EventSender,
259    flow_states: Vec<Arc<Mutex<SubFlowState>>>,
260) {
261    // Initial delay for hardware init (matches Python: sleep(2.0))
262    thread::sleep(Duration::from_secs(2));
263
264    // Send detect request
265    {
266        let detect_cmd = rnode_kiss::detect_request();
267        // Also request FW version, platform, MCU (matches Python detect())
268        let mut cmd = detect_cmd;
269        cmd.extend_from_slice(&rnode_kiss::rnode_command(
270            rnode_kiss::CMD_FW_VERSION,
271            &[0x00],
272        ));
273        cmd.extend_from_slice(&rnode_kiss::rnode_command(
274            rnode_kiss::CMD_PLATFORM,
275            &[0x00],
276        ));
277        cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
278
279        if let Err(e) = writer.lock().unwrap().write_all(&cmd) {
280            log::error!("[{}] failed to send detect: {}", config.name, e);
281            return;
282        }
283    }
284
285    let mut decoder = rnode_kiss::RNodeDecoder::new();
286    let mut buf = [0u8; 4096];
287    let mut detected = false;
288
289    // Detection phase: read until we get Detected or timeout
290    let detect_start = std::time::Instant::now();
291    let detect_timeout = Duration::from_secs(5);
292
293    while !detected && detect_start.elapsed() < detect_timeout {
294        match reader.read(&mut buf) {
295            Ok(0) => {
296                log::warn!("[{}] serial port closed during detect", config.name);
297                return;
298            }
299            Ok(n) => {
300                for event in decoder.feed(&buf[..n]) {
301                    match event {
302                        rnode_kiss::RNodeEvent::Detected(true) => {
303                            detected = true;
304                            log::info!("[{}] RNode device detected", config.name);
305                        }
306                        rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
307                            log::info!("[{}] firmware version {}.{}", config.name, major, minor);
308                        }
309                        rnode_kiss::RNodeEvent::Platform(p) => {
310                            log::info!("[{}] platform: 0x{:02X}", config.name, p);
311                        }
312                        rnode_kiss::RNodeEvent::Mcu(m) => {
313                            log::info!("[{}] MCU: 0x{:02X}", config.name, m);
314                        }
315                        _ => {}
316                    }
317                }
318            }
319            Err(e) => {
320                log::error!("[{}] serial read error during detect: {}", config.name, e);
321                return;
322            }
323        }
324    }
325
326    if !detected {
327        log::error!("[{}] RNode detection timed out", config.name);
328        return;
329    }
330
331    // Configure each subinterface
332    for (i, sub) in config.subinterfaces.iter().enumerate() {
333        if let Err(e) =
334            configure_subinterface(&writer, i as u8, sub, config.subinterfaces.len() > 1)
335        {
336            log::error!(
337                "[{}] failed to configure subinterface {}: {}",
338                config.name,
339                i,
340                e
341            );
342            return;
343        }
344    }
345
346    // Brief delay for configuration to settle (matches Python: sleep(0.3))
347    thread::sleep(Duration::from_millis(300));
348
349    // Signal all subinterfaces as online
350    for i in 0..config.subinterfaces.len() {
351        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
352        let _ = tx.send(Event::InterfaceUp(sub_id, None, None));
353    }
354
355    log::info!(
356        "[{}] RNode configured with {} subinterface(s)",
357        config.name,
358        config.subinterfaces.len()
359    );
360
361    // Data relay loop
362    loop {
363        match reader.read(&mut buf) {
364            Ok(0) => {
365                log::warn!("[{}] serial port closed", config.name);
366                for i in 0..config.subinterfaces.len() {
367                    let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
368                    let _ = tx.send(Event::InterfaceDown(sub_id));
369                }
370                // TODO: reconnect logic
371                return;
372            }
373            Ok(n) => {
374                for event in decoder.feed(&buf[..n]) {
375                    match event {
376                        rnode_kiss::RNodeEvent::DataFrame { index, data } => {
377                            let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
378                            if tx
379                                .send(Event::Frame {
380                                    interface_id: sub_id,
381                                    data,
382                                })
383                                .is_err()
384                            {
385                                return;
386                            }
387                        }
388                        rnode_kiss::RNodeEvent::Ready => {
389                            // Flow control: unlock all subinterfaces that have flow_control
390                            for (i, fs) in flow_states.iter().enumerate() {
391                                if config.subinterfaces[i].flow_control {
392                                    process_flow_queue(fs, &writer, i as u8);
393                                }
394                            }
395                        }
396                        rnode_kiss::RNodeEvent::Error(code) => {
397                            log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
398                        }
399                        _ => {
400                            // Status updates logged but not acted on
401                        }
402                    }
403                }
404            }
405            Err(e) => {
406                log::error!("[{}] serial read error: {}", config.name, e);
407                for i in 0..config.subinterfaces.len() {
408                    let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
409                    let _ = tx.send(Event::InterfaceDown(sub_id));
410                }
411                return;
412            }
413        }
414    }
415}
416
417/// Configure a single subinterface on the RNode device.
418fn configure_subinterface(
419    writer: &Arc<Mutex<std::fs::File>>,
420    index: u8,
421    sub: &RNodeSubConfig,
422    multi: bool,
423) -> io::Result<()> {
424    let mut w = writer.lock().unwrap();
425
426    // For multi-radio, send select command before each parameter
427    let freq_bytes = [
428        (sub.frequency >> 24) as u8,
429        (sub.frequency >> 16) as u8,
430        (sub.frequency >> 8) as u8,
431        (sub.frequency & 0xFF) as u8,
432    ];
433    let bw_bytes = [
434        (sub.bandwidth >> 24) as u8,
435        (sub.bandwidth >> 16) as u8,
436        (sub.bandwidth >> 8) as u8,
437        (sub.bandwidth & 0xFF) as u8,
438    ];
439    let txp = if sub.txpower < 0 {
440        (sub.txpower as i16 + 256) as u8
441    } else {
442        sub.txpower as u8
443    };
444
445    if multi {
446        w.write_all(&rnode_kiss::rnode_select_command(
447            index,
448            rnode_kiss::CMD_FREQUENCY,
449            &freq_bytes,
450        ))?;
451        w.write_all(&rnode_kiss::rnode_select_command(
452            index,
453            rnode_kiss::CMD_BANDWIDTH,
454            &bw_bytes,
455        ))?;
456        w.write_all(&rnode_kiss::rnode_select_command(
457            index,
458            rnode_kiss::CMD_TXPOWER,
459            &[txp],
460        ))?;
461        w.write_all(&rnode_kiss::rnode_select_command(
462            index,
463            rnode_kiss::CMD_SF,
464            &[sub.spreading_factor],
465        ))?;
466        w.write_all(&rnode_kiss::rnode_select_command(
467            index,
468            rnode_kiss::CMD_CR,
469            &[sub.coding_rate],
470        ))?;
471    } else {
472        w.write_all(&rnode_kiss::rnode_command(
473            rnode_kiss::CMD_FREQUENCY,
474            &freq_bytes,
475        ))?;
476        w.write_all(&rnode_kiss::rnode_command(
477            rnode_kiss::CMD_BANDWIDTH,
478            &bw_bytes,
479        ))?;
480        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
481        w.write_all(&rnode_kiss::rnode_command(
482            rnode_kiss::CMD_SF,
483            &[sub.spreading_factor],
484        ))?;
485        w.write_all(&rnode_kiss::rnode_command(
486            rnode_kiss::CMD_CR,
487            &[sub.coding_rate],
488        ))?;
489    }
490
491    // Airtime locks
492    if let Some(st) = sub.st_alock {
493        let st_val = (st * 100.0) as u16;
494        let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
495        if multi {
496            w.write_all(&rnode_kiss::rnode_select_command(
497                index,
498                rnode_kiss::CMD_ST_ALOCK,
499                &st_bytes,
500            ))?;
501        } else {
502            w.write_all(&rnode_kiss::rnode_command(
503                rnode_kiss::CMD_ST_ALOCK,
504                &st_bytes,
505            ))?;
506        }
507    }
508    if let Some(lt) = sub.lt_alock {
509        let lt_val = (lt * 100.0) as u16;
510        let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
511        if multi {
512            w.write_all(&rnode_kiss::rnode_select_command(
513                index,
514                rnode_kiss::CMD_LT_ALOCK,
515                &lt_bytes,
516            ))?;
517        } else {
518            w.write_all(&rnode_kiss::rnode_command(
519                rnode_kiss::CMD_LT_ALOCK,
520                &lt_bytes,
521            ))?;
522        }
523    }
524
525    // Turn on radio
526    if multi {
527        w.write_all(&rnode_kiss::rnode_select_command(
528            index,
529            rnode_kiss::CMD_RADIO_STATE,
530            &[rnode_kiss::RADIO_STATE_ON],
531        ))?;
532    } else {
533        w.write_all(&rnode_kiss::rnode_command(
534            rnode_kiss::CMD_RADIO_STATE,
535            &[rnode_kiss::RADIO_STATE_ON],
536        ))?;
537    }
538
539    Ok(())
540}
541
542/// Process flow control queue for a subinterface.
543fn process_flow_queue(
544    flow_state: &Arc<Mutex<SubFlowState>>,
545    writer: &Arc<Mutex<std::fs::File>>,
546    index: u8,
547) {
548    let mut state = flow_state.lock().unwrap();
549    if let Some(data) = state.queue.pop_front() {
550        state.ready = false;
551        drop(state);
552        let frame = rnode_kiss::rnode_data_frame(index, &data);
553        let _ = writer.lock().unwrap().write_all(&frame);
554    } else {
555        state.ready = true;
556    }
557}
558
559// --- Factory implementation ---
560
561use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, SubInterface};
562use rns_core::transport::types::InterfaceInfo;
563use std::collections::HashMap;
564
565/// Factory for `RNodeInterface`.
566pub struct RNodeFactory;
567
568impl InterfaceFactory for RNodeFactory {
569    fn type_name(&self) -> &str {
570        "RNodeInterface"
571    }
572
573    fn default_ifac_size(&self) -> usize {
574        8
575    }
576
577    fn parse_config(
578        &self,
579        name: &str,
580        id: InterfaceId,
581        params: &HashMap<String, String>,
582    ) -> Result<Box<dyn InterfaceConfigData>, String> {
583        let pre_opened_fd = params.get("fd").and_then(|v| v.parse::<i32>().ok());
584
585        let port = params
586            .get("port")
587            .cloned()
588            .or_else(|| pre_opened_fd.map(|_| "usb-bridge".to_string()))
589            .ok_or_else(|| "RNodeInterface requires 'port' or 'fd'".to_string())?;
590
591        let speed = params
592            .get("speed")
593            .and_then(|v| v.parse().ok())
594            .unwrap_or(115200u32);
595
596        let frequency = params
597            .get("frequency")
598            .and_then(|v| v.parse().ok())
599            .unwrap_or(868_000_000u32);
600
601        let bandwidth = params
602            .get("bandwidth")
603            .and_then(|v| v.parse().ok())
604            .unwrap_or(125_000u32);
605
606        let txpower = params
607            .get("txpower")
608            .and_then(|v| v.parse().ok())
609            .unwrap_or(7i8);
610
611        let spreading_factor = params
612            .get("spreadingfactor")
613            .or_else(|| params.get("spreading_factor"))
614            .and_then(|v| v.parse().ok())
615            .unwrap_or(8u8);
616
617        let coding_rate = params
618            .get("codingrate")
619            .or_else(|| params.get("coding_rate"))
620            .and_then(|v| v.parse().ok())
621            .unwrap_or(5u8);
622
623        let flow_control = params
624            .get("flow_control")
625            .and_then(|v| crate::config::parse_bool_pub(v))
626            .unwrap_or(false);
627
628        let st_alock = params.get("st_alock").and_then(|v| v.parse().ok());
629
630        let lt_alock = params.get("lt_alock").and_then(|v| v.parse().ok());
631
632        let id_interval = params.get("id_interval").and_then(|v| v.parse().ok());
633
634        let id_callsign = params.get("id_callsign").map(|v| v.as_bytes().to_vec());
635
636        let sub = RNodeSubConfig {
637            name: name.to_string(),
638            frequency,
639            bandwidth,
640            txpower,
641            spreading_factor,
642            coding_rate,
643            flow_control,
644            st_alock,
645            lt_alock,
646        };
647
648        Ok(Box::new(RNodeConfig {
649            name: name.to_string(),
650            port,
651            speed,
652            subinterfaces: vec![sub],
653            id_interval,
654            id_callsign,
655            base_interface_id: id,
656            pre_opened_fd,
657        }))
658    }
659
660    fn start(
661        &self,
662        config: Box<dyn InterfaceConfigData>,
663        ctx: StartContext,
664    ) -> std::io::Result<StartResult> {
665        let rnode_config = *config.into_any().downcast::<RNodeConfig>().map_err(|_| {
666            std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
667        })?;
668
669        let name = rnode_config.name.clone();
670
671        let pairs = start(rnode_config, ctx.tx)?;
672
673        let mut subs = Vec::with_capacity(pairs.len());
674        for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
675            let sub_name = if index == 0 {
676                name.clone()
677            } else {
678                format!("{}/{}", name, index)
679            };
680
681            let info = InterfaceInfo {
682                id: sub_id,
683                name: sub_name,
684                mode: ctx.mode,
685                out_capable: true,
686                in_capable: true,
687                bitrate: None,
688                announce_rate_target: None,
689                announce_rate_grace: 0,
690                announce_rate_penalty: 0.0,
691                announce_cap: rns_core::constants::ANNOUNCE_CAP,
692                is_local_client: false,
693                wants_tunnel: false,
694                tunnel_id: None,
695                mtu: rns_core::constants::MTU as u32,
696                ingress_control: false,
697                ia_freq: 0.0,
698                started: crate::time::now(),
699            };
700
701            subs.push(SubInterface {
702                id: sub_id,
703                info,
704                writer,
705                interface_type_name: "RNodeInterface".to_string(),
706            });
707        }
708
709        Ok(StartResult::Multi(subs))
710    }
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716    use crate::kiss;
717    use crate::serial::open_pty_pair;
718    use std::os::unix::io::{AsRawFd, FromRawFd};
719    /// Helper: poll an fd for reading with timeout (ms).
720    fn poll_read(fd: i32, timeout_ms: i32) -> bool {
721        let mut pfd = libc::pollfd {
722            fd,
723            events: libc::POLLIN,
724            revents: 0,
725        };
726        let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
727        ret > 0
728    }
729
730    /// Read all available bytes from an fd.
731    fn read_available(file: &mut std::fs::File) -> Vec<u8> {
732        let mut all = Vec::new();
733        let mut buf = [0u8; 4096];
734        while poll_read(file.as_raw_fd(), 100) {
735            match file.read(&mut buf) {
736                Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
737                _ => break,
738            }
739        }
740        all
741    }
742
743    /// Mock RNode: respond to detect with DETECT_RESP, FW version, platform
744    fn mock_respond_detect(master: &mut std::fs::File) {
745        // Respond: DETECT_RESP
746        master
747            .write_all(&rnode_kiss::rnode_command(
748                rnode_kiss::CMD_DETECT,
749                &[rnode_kiss::DETECT_RESP],
750            ))
751            .unwrap();
752        // FW version 1.74
753        master
754            .write_all(&rnode_kiss::rnode_command(
755                rnode_kiss::CMD_FW_VERSION,
756                &[0x01, 0x4A],
757            ))
758            .unwrap();
759        // Platform ESP32
760        master
761            .write_all(&rnode_kiss::rnode_command(
762                rnode_kiss::CMD_PLATFORM,
763                &[0x80],
764            ))
765            .unwrap();
766        // MCU
767        master
768            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
769            .unwrap();
770        master.flush().unwrap();
771    }
772
773    #[test]
774    fn rnode_detect_over_pty() {
775        // Test that the RNode decoder can parse detect responses from a PTY
776        let (master_fd, slave_fd) = open_pty_pair().unwrap();
777        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
778        let mut slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
779
780        // Write detect response to master
781        mock_respond_detect(&mut master);
782
783        // Read from slave with RNode decoder
784        assert!(poll_read(slave.as_raw_fd(), 2000));
785        let mut decoder = rnode_kiss::RNodeDecoder::new();
786        let mut buf = [0u8; 4096];
787        let n = slave.read(&mut buf).unwrap();
788        let events = decoder.feed(&buf[..n]);
789
790        assert!(
791            events
792                .iter()
793                .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
794            "should detect device"
795        );
796        assert!(
797            events
798                .iter()
799                .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
800            "should get firmware version"
801        );
802    }
803
804    #[test]
805    fn rnode_configure_commands() {
806        let (master_fd, slave_fd) = open_pty_pair().unwrap();
807        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
808        let writer_file = unsafe { std::fs::File::from_raw_fd(libc::dup(slave_fd)) };
809        let writer = Arc::new(Mutex::new(writer_file));
810
811        let sub = RNodeSubConfig {
812            name: "test".into(),
813            frequency: 868_000_000,
814            bandwidth: 125_000,
815            txpower: 7,
816            spreading_factor: 8,
817            coding_rate: 5,
818            flow_control: false,
819            st_alock: None,
820            lt_alock: None,
821        };
822
823        configure_subinterface(&writer, 0, &sub, false).unwrap();
824
825        // Read what was sent
826        let data = read_available(&mut master);
827
828        // Should contain frequency command
829        assert!(
830            data.windows(2)
831                .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
832            "should contain FREQUENCY command"
833        );
834        // Should contain bandwidth command
835        assert!(
836            data.windows(2)
837                .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
838            "should contain BANDWIDTH command"
839        );
840        // Should contain RADIO_STATE ON
841        assert!(
842            data.windows(3).any(|w| w[0] == kiss::FEND
843                && w[1] == rnode_kiss::CMD_RADIO_STATE
844                && w[2] == rnode_kiss::RADIO_STATE_ON),
845            "should contain RADIO_STATE ON command"
846        );
847
848        unsafe { libc::close(slave_fd) };
849    }
850
851    #[test]
852    fn rnode_data_roundtrip() {
853        let (master_fd, slave_fd) = open_pty_pair().unwrap();
854        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
855        let slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
856
857        // Write a data frame (subinterface 0) to master
858        let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
859        let frame = rnode_kiss::rnode_data_frame(0, &payload);
860        master.write_all(&frame).unwrap();
861        master.flush().unwrap();
862
863        // Read from slave with RNode decoder
864        assert!(poll_read(slave.as_raw_fd(), 2000));
865        let mut decoder = rnode_kiss::RNodeDecoder::new();
866        let mut buf = [0u8; 4096];
867        let mut slave_file = slave;
868        let n = slave_file.read(&mut buf).unwrap();
869        let events = decoder.feed(&buf[..n]);
870
871        assert_eq!(events.len(), 1);
872        match &events[0] {
873            rnode_kiss::RNodeEvent::DataFrame { index, data } => {
874                assert_eq!(*index, 0);
875                assert_eq!(data, &payload);
876            }
877            other => panic!("expected DataFrame, got {:?}", other),
878        }
879    }
880
881    #[test]
882    fn rnode_flow_control() {
883        let (master_fd, slave_fd) = open_pty_pair().unwrap();
884        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
885        let shared_writer = Arc::new(Mutex::new(writer_file));
886
887        let flow_state = Arc::new(Mutex::new(SubFlowState {
888            ready: true,
889            queue: std::collections::VecDeque::new(),
890        }));
891
892        let mut sub_writer = RNodeSubWriter {
893            writer: shared_writer.clone(),
894            index: 0,
895            flow_control: true,
896            flow_state: flow_state.clone(),
897        };
898
899        // First send: should go through (ready=true) and set ready=false
900        sub_writer.send_frame(b"hello").unwrap();
901        assert!(!flow_state.lock().unwrap().ready);
902
903        // Second send: should be queued
904        sub_writer.send_frame(b"world").unwrap();
905        assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
906
907        // Process flow queue (simulates CMD_READY)
908        process_flow_queue(&flow_state, &shared_writer, 0);
909        assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
910        assert!(!flow_state.lock().unwrap().ready); // sent queued, still locked
911
912        // Process again with empty queue: sets ready=true
913        process_flow_queue(&flow_state, &shared_writer, 0);
914        assert!(flow_state.lock().unwrap().ready);
915
916        unsafe { libc::close(master_fd) };
917    }
918
919    #[test]
920    fn rnode_sub_writer_format() {
921        let (master_fd, slave_fd) = open_pty_pair().unwrap();
922        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
923        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
924        let shared_writer = Arc::new(Mutex::new(writer_file));
925
926        let flow_state = Arc::new(Mutex::new(SubFlowState {
927            ready: true,
928            queue: std::collections::VecDeque::new(),
929        }));
930
931        let mut sub_writer = RNodeSubWriter {
932            writer: shared_writer,
933            index: 1, // subinterface 1
934            flow_control: false,
935            flow_state,
936        };
937
938        let payload = vec![0xAA, 0xBB, 0xCC];
939        sub_writer.send_frame(&payload).unwrap();
940
941        // Read from master
942        assert!(poll_read(master.as_raw_fd(), 2000));
943        let mut buf = [0u8; 256];
944        let n = master.read(&mut buf).unwrap();
945
946        // Should start with FEND, CMD_INT1_DATA (0x10)
947        assert_eq!(buf[0], kiss::FEND);
948        assert_eq!(buf[1], 0x10); // CMD_INT1_DATA
949        assert_eq!(buf[n - 1], kiss::FEND);
950    }
951
952    #[test]
953    fn rnode_multi_sub_routing() {
954        // Test that data frames on different CMD_INTn route to different indices
955        let mut decoder = rnode_kiss::RNodeDecoder::new();
956
957        let payload0 = vec![0x01, 0x02];
958        let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
959        let events0 = decoder.feed(&frame0);
960        assert_eq!(events0.len(), 1);
961        assert_eq!(
962            events0[0],
963            rnode_kiss::RNodeEvent::DataFrame {
964                index: 0,
965                data: payload0
966            }
967        );
968
969        let payload1 = vec![0x03, 0x04];
970        let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
971        let events1 = decoder.feed(&frame1);
972        assert_eq!(events1.len(), 1);
973        assert_eq!(
974            events1[0],
975            rnode_kiss::RNodeEvent::DataFrame {
976                index: 1,
977                data: payload1
978            }
979        );
980    }
981
982    #[test]
983    fn rnode_error_handling() {
984        // NOTE: CMD_ERROR (0x90) == CMD_INT5_DATA (0x90) — they share the same
985        // byte value. In multi-radio mode, 0x90 is treated as INT5_DATA.
986        // Error events only appear on single-radio devices where there's no
987        // INT5_DATA. The decoder treats 0x90 as INT5_DATA (data wins).
988        // For real error detection, we rely on the data frame being invalid
989        // or the device sending a different error indicator.
990        // Test that cmd 0x90 is correctly handled as a data frame.
991        let mut decoder = rnode_kiss::RNodeDecoder::new();
992        let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
993        let events = decoder.feed(&frame);
994        assert_eq!(events.len(), 1);
995        // 0x90 = CMD_INT5_DATA, so it's a DataFrame with index 5
996        assert_eq!(
997            events[0],
998            rnode_kiss::RNodeEvent::DataFrame {
999                index: 5,
1000                data: vec![0x02]
1001            }
1002        );
1003    }
1004
1005    #[test]
1006    fn rnode_config_validation() {
1007        let good = RNodeSubConfig {
1008            name: "test".into(),
1009            frequency: 868_000_000,
1010            bandwidth: 125_000,
1011            txpower: 7,
1012            spreading_factor: 8,
1013            coding_rate: 5,
1014            flow_control: false,
1015            st_alock: None,
1016            lt_alock: None,
1017        };
1018        assert!(validate_sub_config(&good).is_none());
1019
1020        // Bad frequency
1021        let mut bad = good.clone();
1022        bad.frequency = 100;
1023        assert!(validate_sub_config(&bad).is_some());
1024
1025        // Bad SF
1026        bad = good.clone();
1027        bad.spreading_factor = 13;
1028        assert!(validate_sub_config(&bad).is_some());
1029
1030        // Bad CR
1031        bad = good.clone();
1032        bad.coding_rate = 9;
1033        assert!(validate_sub_config(&bad).is_some());
1034
1035        // Bad BW
1036        bad = good.clone();
1037        bad.bandwidth = 5;
1038        assert!(validate_sub_config(&bad).is_some());
1039
1040        // Bad TX power
1041        bad = good.clone();
1042        bad.txpower = 50;
1043        assert!(validate_sub_config(&bad).is_some());
1044    }
1045}