Skip to main content

rns_net/interface/
rnode.rs

1//! RNode LoRa radio interface.
2//!
3//! Manages serial connection to RNode device, detects firmware,
4//! configures radio parameters per subinterface. Supports single-radio
5//! (RNodeInterface) and multi-radio (RNodeMultiInterface) devices.
6//!
7//! Matches Python `RNodeInterface.py` and `RNodeMultiInterface.py`.
8
9use std::io::{self, Read, Write};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::InterfaceId;
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18use crate::rnode_kiss;
19use crate::serial::{Parity, SerialConfig, SerialPort};
20
21/// Validation limits matching Python RNodeInterface.
22pub const FREQ_MIN: u32 = 137_000_000;
23pub const FREQ_MAX: u32 = 3_000_000_000;
24pub const BW_MIN: u32 = 7_800;
25pub const BW_MAX: u32 = 1_625_000;
26pub const SF_MIN: u8 = 5;
27pub const SF_MAX: u8 = 12;
28pub const CR_MIN: u8 = 5;
29pub const CR_MAX: u8 = 8;
30pub const TXPOWER_MIN: i8 = 0;
31pub const TXPOWER_MAX: i8 = 37;
32pub const HW_MTU: u32 = 508;
33
34/// Configuration for one RNode subinterface (radio).
35#[derive(Debug, Clone)]
36pub struct RNodeSubConfig {
37    pub name: String,
38    pub frequency: u32,
39    pub bandwidth: u32,
40    pub txpower: i8,
41    pub spreading_factor: u8,
42    pub coding_rate: u8,
43    pub flow_control: bool,
44    pub st_alock: Option<f32>,
45    pub lt_alock: Option<f32>,
46}
47
48/// Configuration for the RNode device.
49#[derive(Debug, Clone)]
50pub struct RNodeConfig {
51    pub name: String,
52    pub port: String,
53    pub speed: u32,
54    pub subinterfaces: Vec<RNodeSubConfig>,
55    pub id_interval: Option<u32>,
56    pub id_callsign: Option<Vec<u8>>,
57    pub base_interface_id: InterfaceId,
58}
59
60impl Default for RNodeConfig {
61    fn default() -> Self {
62        RNodeConfig {
63            name: String::new(),
64            port: String::new(),
65            speed: 115200,
66            subinterfaces: Vec::new(),
67            id_interval: None,
68            id_callsign: None,
69            base_interface_id: InterfaceId(0),
70        }
71    }
72}
73
74/// Validate subinterface configuration. Returns error message if invalid.
75pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
76    if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
77        return Some(format!("Invalid frequency {} for {}", sub.frequency, sub.name));
78    }
79    if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
80        return Some(format!("Invalid bandwidth {} for {}", sub.bandwidth, sub.name));
81    }
82    if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
83        return Some(format!("Invalid SF {} for {}", sub.spreading_factor, sub.name));
84    }
85    if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
86        return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
87    }
88    if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
89        return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
90    }
91    if let Some(st) = sub.st_alock {
92        if st < 0.0 || st > 100.0 {
93            return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
94        }
95    }
96    if let Some(lt) = sub.lt_alock {
97        if lt < 0.0 || lt > 100.0 {
98            return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
99        }
100    }
101    None
102}
103
104/// Writer for a specific RNode subinterface.
105/// Wraps a shared serial writer with subinterface-specific data framing.
106struct RNodeSubWriter {
107    writer: Arc<Mutex<std::fs::File>>,
108    index: u8,
109    flow_control: bool,
110    flow_state: Arc<Mutex<SubFlowState>>,
111}
112
113struct SubFlowState {
114    ready: bool,
115    queue: std::collections::VecDeque<Vec<u8>>,
116}
117
118impl Writer for RNodeSubWriter {
119    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
120        let frame = rnode_kiss::rnode_data_frame(self.index, data);
121        if self.flow_control {
122            let mut state = self.flow_state.lock().unwrap();
123            if state.ready {
124                state.ready = false;
125                drop(state);
126                self.writer.lock().unwrap().write_all(&frame)
127            } else {
128                state.queue.push_back(data.to_vec());
129                Ok(())
130            }
131        } else {
132            self.writer.lock().unwrap().write_all(&frame)
133        }
134    }
135}
136
137/// Start the RNode interface.
138///
139/// Opens serial port, spawns reader thread which performs detect+configure,
140/// then enters data relay mode.
141///
142/// Returns one `(InterfaceId, Box<dyn Writer>)` per subinterface.
143pub fn start(
144    config: RNodeConfig,
145    tx: EventSender,
146) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
147    // Validate all subinterface configs upfront
148    for sub in &config.subinterfaces {
149        if let Some(err) = validate_sub_config(sub) {
150            return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
151        }
152    }
153
154    if config.subinterfaces.is_empty() {
155        return Err(io::Error::new(
156            io::ErrorKind::InvalidInput,
157            "No subinterfaces configured",
158        ));
159    }
160
161    let serial_config = SerialConfig {
162        path: config.port.clone(),
163        baud: config.speed,
164        data_bits: 8,
165        parity: Parity::None,
166        stop_bits: 1,
167    };
168
169    let port = SerialPort::open(&serial_config)?;
170    let reader_file = port.reader()?;
171    let config_writer = port.writer()?;
172    let shared_writer = Arc::new(Mutex::new(config_writer));
173
174    // Build per-subinterface writers and IDs
175    let num_subs = config.subinterfaces.len();
176    let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
177    let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
178
179    for (i, sub) in config.subinterfaces.iter().enumerate() {
180        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
181        let flow_state = Arc::new(Mutex::new(SubFlowState {
182            ready: true,
183            queue: std::collections::VecDeque::new(),
184        }));
185        flow_states.push(flow_state.clone());
186
187        let sub_writer: Box<dyn Writer> = Box::new(RNodeSubWriter {
188            writer: shared_writer.clone(),
189            index: i as u8,
190            flow_control: sub.flow_control,
191            flow_state,
192        });
193        writers.push((sub_id, sub_writer));
194    }
195
196    // Spawn reader thread
197    let reader_shared_writer = shared_writer.clone();
198    let reader_config = config.clone();
199    let reader_flow_states = flow_states;
200    thread::Builder::new()
201        .name(format!("rnode-reader-{}", config.base_interface_id.0))
202        .spawn(move || {
203            reader_loop(
204                reader_file,
205                reader_shared_writer,
206                reader_config,
207                tx,
208                reader_flow_states,
209            );
210        })?;
211
212    Ok(writers)
213}
214
215/// Reader loop: detect device, configure radios, then relay data frames.
216fn reader_loop(
217    mut reader: std::fs::File,
218    writer: Arc<Mutex<std::fs::File>>,
219    config: RNodeConfig,
220    tx: EventSender,
221    flow_states: Vec<Arc<Mutex<SubFlowState>>>,
222) {
223    // Initial delay for hardware init (matches Python: sleep(2.0))
224    thread::sleep(Duration::from_secs(2));
225
226    // Send detect request
227    {
228        let detect_cmd = rnode_kiss::detect_request();
229        // Also request FW version, platform, MCU (matches Python detect())
230        let mut cmd = detect_cmd;
231        cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_FW_VERSION, &[0x00]));
232        cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_PLATFORM, &[0x00]));
233        cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
234
235        if let Err(e) = writer.lock().unwrap().write_all(&cmd) {
236            log::error!("[{}] failed to send detect: {}", config.name, e);
237            return;
238        }
239    }
240
241    let mut decoder = rnode_kiss::RNodeDecoder::new();
242    let mut buf = [0u8; 4096];
243    let mut detected = false;
244
245    // Detection phase: read until we get Detected or timeout
246    let detect_start = std::time::Instant::now();
247    let detect_timeout = Duration::from_secs(5);
248
249    while !detected && detect_start.elapsed() < detect_timeout {
250        match reader.read(&mut buf) {
251            Ok(0) => {
252                log::warn!("[{}] serial port closed during detect", config.name);
253                return;
254            }
255            Ok(n) => {
256                for event in decoder.feed(&buf[..n]) {
257                    match event {
258                        rnode_kiss::RNodeEvent::Detected(true) => {
259                            detected = true;
260                            log::info!("[{}] RNode device detected", config.name);
261                        }
262                        rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
263                            log::info!(
264                                "[{}] firmware version {}.{}",
265                                config.name,
266                                major,
267                                minor
268                            );
269                        }
270                        rnode_kiss::RNodeEvent::Platform(p) => {
271                            log::info!("[{}] platform: 0x{:02X}", config.name, p);
272                        }
273                        rnode_kiss::RNodeEvent::Mcu(m) => {
274                            log::info!("[{}] MCU: 0x{:02X}", config.name, m);
275                        }
276                        _ => {}
277                    }
278                }
279            }
280            Err(e) => {
281                log::error!("[{}] serial read error during detect: {}", config.name, e);
282                return;
283            }
284        }
285    }
286
287    if !detected {
288        log::error!("[{}] RNode detection timed out", config.name);
289        return;
290    }
291
292    // Configure each subinterface
293    for (i, sub) in config.subinterfaces.iter().enumerate() {
294        if let Err(e) = configure_subinterface(&writer, i as u8, sub, config.subinterfaces.len() > 1) {
295            log::error!("[{}] failed to configure subinterface {}: {}", config.name, i, e);
296            return;
297        }
298    }
299
300    // Brief delay for configuration to settle (matches Python: sleep(0.3))
301    thread::sleep(Duration::from_millis(300));
302
303    // Signal all subinterfaces as online
304    for i in 0..config.subinterfaces.len() {
305        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
306        let _ = tx.send(Event::InterfaceUp(sub_id, None, None));
307    }
308
309    log::info!(
310        "[{}] RNode configured with {} subinterface(s)",
311        config.name,
312        config.subinterfaces.len()
313    );
314
315    // Data relay loop
316    loop {
317        match reader.read(&mut buf) {
318            Ok(0) => {
319                log::warn!("[{}] serial port closed", config.name);
320                for i in 0..config.subinterfaces.len() {
321                    let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
322                    let _ = tx.send(Event::InterfaceDown(sub_id));
323                }
324                // TODO: reconnect logic
325                return;
326            }
327            Ok(n) => {
328                for event in decoder.feed(&buf[..n]) {
329                    match event {
330                        rnode_kiss::RNodeEvent::DataFrame { index, data } => {
331                            let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
332                            if tx
333                                .send(Event::Frame {
334                                    interface_id: sub_id,
335                                    data,
336                                })
337                                .is_err()
338                            {
339                                return;
340                            }
341                        }
342                        rnode_kiss::RNodeEvent::Ready => {
343                            // Flow control: unlock all subinterfaces that have flow_control
344                            for (i, fs) in flow_states.iter().enumerate() {
345                                if config.subinterfaces[i].flow_control {
346                                    process_flow_queue(fs, &writer, i as u8);
347                                }
348                            }
349                        }
350                        rnode_kiss::RNodeEvent::Error(code) => {
351                            log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
352                        }
353                        _ => {
354                            // Status updates logged but not acted on
355                        }
356                    }
357                }
358            }
359            Err(e) => {
360                log::error!("[{}] serial read error: {}", config.name, e);
361                for i in 0..config.subinterfaces.len() {
362                    let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
363                    let _ = tx.send(Event::InterfaceDown(sub_id));
364                }
365                return;
366            }
367        }
368    }
369}
370
371/// Configure a single subinterface on the RNode device.
372fn configure_subinterface(
373    writer: &Arc<Mutex<std::fs::File>>,
374    index: u8,
375    sub: &RNodeSubConfig,
376    multi: bool,
377) -> io::Result<()> {
378    let mut w = writer.lock().unwrap();
379
380    // For multi-radio, send select command before each parameter
381    let freq_bytes = [
382        (sub.frequency >> 24) as u8,
383        (sub.frequency >> 16) as u8,
384        (sub.frequency >> 8) as u8,
385        (sub.frequency & 0xFF) as u8,
386    ];
387    let bw_bytes = [
388        (sub.bandwidth >> 24) as u8,
389        (sub.bandwidth >> 16) as u8,
390        (sub.bandwidth >> 8) as u8,
391        (sub.bandwidth & 0xFF) as u8,
392    ];
393    let txp = if sub.txpower < 0 {
394        (sub.txpower as i16 + 256) as u8
395    } else {
396        sub.txpower as u8
397    };
398
399    if multi {
400        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_FREQUENCY, &freq_bytes))?;
401        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_BANDWIDTH, &bw_bytes))?;
402        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_TXPOWER, &[txp]))?;
403        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_SF, &[sub.spreading_factor]))?;
404        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_CR, &[sub.coding_rate]))?;
405    } else {
406        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_FREQUENCY, &freq_bytes))?;
407        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_BANDWIDTH, &bw_bytes))?;
408        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
409        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_SF, &[sub.spreading_factor]))?;
410        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_CR, &[sub.coding_rate]))?;
411    }
412
413    // Airtime locks
414    if let Some(st) = sub.st_alock {
415        let st_val = (st * 100.0) as u16;
416        let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
417        if multi {
418            w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_ST_ALOCK, &st_bytes))?;
419        } else {
420            w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_ST_ALOCK, &st_bytes))?;
421        }
422    }
423    if let Some(lt) = sub.lt_alock {
424        let lt_val = (lt * 100.0) as u16;
425        let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
426        if multi {
427            w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_LT_ALOCK, &lt_bytes))?;
428        } else {
429            w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_LT_ALOCK, &lt_bytes))?;
430        }
431    }
432
433    // Turn on radio
434    if multi {
435        w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_RADIO_STATE, &[rnode_kiss::RADIO_STATE_ON]))?;
436    } else {
437        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_RADIO_STATE, &[rnode_kiss::RADIO_STATE_ON]))?;
438    }
439
440    Ok(())
441}
442
443/// Process flow control queue for a subinterface.
444fn process_flow_queue(
445    flow_state: &Arc<Mutex<SubFlowState>>,
446    writer: &Arc<Mutex<std::fs::File>>,
447    index: u8,
448) {
449    let mut state = flow_state.lock().unwrap();
450    if let Some(data) = state.queue.pop_front() {
451        state.ready = false;
452        drop(state);
453        let frame = rnode_kiss::rnode_data_frame(index, &data);
454        let _ = writer.lock().unwrap().write_all(&frame);
455    } else {
456        state.ready = true;
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use crate::kiss;
464    use crate::serial::open_pty_pair;
465    use std::os::unix::io::{AsRawFd, FromRawFd};
466    /// Helper: poll an fd for reading with timeout (ms).
467    fn poll_read(fd: i32, timeout_ms: i32) -> bool {
468        let mut pfd = libc::pollfd {
469            fd,
470            events: libc::POLLIN,
471            revents: 0,
472        };
473        let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
474        ret > 0
475    }
476
477    /// Read all available bytes from an fd.
478    fn read_available(file: &mut std::fs::File) -> Vec<u8> {
479        let mut all = Vec::new();
480        let mut buf = [0u8; 4096];
481        while poll_read(file.as_raw_fd(), 100) {
482            match file.read(&mut buf) {
483                Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
484                _ => break,
485            }
486        }
487        all
488    }
489
490    /// Mock RNode: respond to detect with DETECT_RESP, FW version, platform
491    fn mock_respond_detect(master: &mut std::fs::File) {
492        // Respond: DETECT_RESP
493        master
494            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_DETECT, &[rnode_kiss::DETECT_RESP]))
495            .unwrap();
496        // FW version 1.74
497        master
498            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_FW_VERSION, &[0x01, 0x4A]))
499            .unwrap();
500        // Platform ESP32
501        master
502            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_PLATFORM, &[0x80]))
503            .unwrap();
504        // MCU
505        master
506            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
507            .unwrap();
508        master.flush().unwrap();
509    }
510
511    #[test]
512    fn rnode_detect_over_pty() {
513        // Test that the RNode decoder can parse detect responses from a PTY
514        let (master_fd, slave_fd) = open_pty_pair().unwrap();
515        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
516        let mut slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
517
518        // Write detect response to master
519        mock_respond_detect(&mut master);
520
521        // Read from slave with RNode decoder
522        assert!(poll_read(slave.as_raw_fd(), 2000));
523        let mut decoder = rnode_kiss::RNodeDecoder::new();
524        let mut buf = [0u8; 4096];
525        let n = slave.read(&mut buf).unwrap();
526        let events = decoder.feed(&buf[..n]);
527
528        assert!(
529            events.iter().any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
530            "should detect device"
531        );
532        assert!(
533            events.iter().any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
534            "should get firmware version"
535        );
536    }
537
538    #[test]
539    fn rnode_configure_commands() {
540        let (master_fd, slave_fd) = open_pty_pair().unwrap();
541        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
542        let writer_file = unsafe { std::fs::File::from_raw_fd(libc::dup(slave_fd)) };
543        let writer = Arc::new(Mutex::new(writer_file));
544
545        let sub = RNodeSubConfig {
546            name: "test".into(),
547            frequency: 868_000_000,
548            bandwidth: 125_000,
549            txpower: 7,
550            spreading_factor: 8,
551            coding_rate: 5,
552            flow_control: false,
553            st_alock: None,
554            lt_alock: None,
555        };
556
557        configure_subinterface(&writer, 0, &sub, false).unwrap();
558
559        // Read what was sent
560        let data = read_available(&mut master);
561
562        // Should contain frequency command
563        assert!(data.windows(2).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
564            "should contain FREQUENCY command");
565        // Should contain bandwidth command
566        assert!(data.windows(2).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
567            "should contain BANDWIDTH command");
568        // Should contain RADIO_STATE ON
569        assert!(data.windows(3).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_RADIO_STATE && w[2] == rnode_kiss::RADIO_STATE_ON),
570            "should contain RADIO_STATE ON command");
571
572        unsafe { libc::close(slave_fd) };
573    }
574
575    #[test]
576    fn rnode_data_roundtrip() {
577        let (master_fd, slave_fd) = open_pty_pair().unwrap();
578        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
579        let slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
580
581        // Write a data frame (subinterface 0) to master
582        let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
583        let frame = rnode_kiss::rnode_data_frame(0, &payload);
584        master.write_all(&frame).unwrap();
585        master.flush().unwrap();
586
587        // Read from slave with RNode decoder
588        assert!(poll_read(slave.as_raw_fd(), 2000));
589        let mut decoder = rnode_kiss::RNodeDecoder::new();
590        let mut buf = [0u8; 4096];
591        let mut slave_file = slave;
592        let n = slave_file.read(&mut buf).unwrap();
593        let events = decoder.feed(&buf[..n]);
594
595        assert_eq!(events.len(), 1);
596        match &events[0] {
597            rnode_kiss::RNodeEvent::DataFrame { index, data } => {
598                assert_eq!(*index, 0);
599                assert_eq!(data, &payload);
600            }
601            other => panic!("expected DataFrame, got {:?}", other),
602        }
603    }
604
605    #[test]
606    fn rnode_flow_control() {
607        let (master_fd, slave_fd) = open_pty_pair().unwrap();
608        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
609        let shared_writer = Arc::new(Mutex::new(writer_file));
610
611        let flow_state = Arc::new(Mutex::new(SubFlowState {
612            ready: true,
613            queue: std::collections::VecDeque::new(),
614        }));
615
616        let mut sub_writer = RNodeSubWriter {
617            writer: shared_writer.clone(),
618            index: 0,
619            flow_control: true,
620            flow_state: flow_state.clone(),
621        };
622
623        // First send: should go through (ready=true) and set ready=false
624        sub_writer.send_frame(b"hello").unwrap();
625        assert!(!flow_state.lock().unwrap().ready);
626
627        // Second send: should be queued
628        sub_writer.send_frame(b"world").unwrap();
629        assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
630
631        // Process flow queue (simulates CMD_READY)
632        process_flow_queue(&flow_state, &shared_writer, 0);
633        assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
634        assert!(!flow_state.lock().unwrap().ready); // sent queued, still locked
635
636        // Process again with empty queue: sets ready=true
637        process_flow_queue(&flow_state, &shared_writer, 0);
638        assert!(flow_state.lock().unwrap().ready);
639
640        unsafe { libc::close(master_fd) };
641    }
642
643    #[test]
644    fn rnode_sub_writer_format() {
645        let (master_fd, slave_fd) = open_pty_pair().unwrap();
646        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
647        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
648        let shared_writer = Arc::new(Mutex::new(writer_file));
649
650        let flow_state = Arc::new(Mutex::new(SubFlowState {
651            ready: true,
652            queue: std::collections::VecDeque::new(),
653        }));
654
655        let mut sub_writer = RNodeSubWriter {
656            writer: shared_writer,
657            index: 1, // subinterface 1
658            flow_control: false,
659            flow_state,
660        };
661
662        let payload = vec![0xAA, 0xBB, 0xCC];
663        sub_writer.send_frame(&payload).unwrap();
664
665        // Read from master
666        assert!(poll_read(master.as_raw_fd(), 2000));
667        let mut buf = [0u8; 256];
668        let n = master.read(&mut buf).unwrap();
669
670        // Should start with FEND, CMD_INT1_DATA (0x10)
671        assert_eq!(buf[0], kiss::FEND);
672        assert_eq!(buf[1], 0x10); // CMD_INT1_DATA
673        assert_eq!(buf[n - 1], kiss::FEND);
674    }
675
676    #[test]
677    fn rnode_multi_sub_routing() {
678        // Test that data frames on different CMD_INTn route to different indices
679        let mut decoder = rnode_kiss::RNodeDecoder::new();
680
681        let payload0 = vec![0x01, 0x02];
682        let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
683        let events0 = decoder.feed(&frame0);
684        assert_eq!(events0.len(), 1);
685        assert_eq!(events0[0], rnode_kiss::RNodeEvent::DataFrame { index: 0, data: payload0 });
686
687        let payload1 = vec![0x03, 0x04];
688        let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
689        let events1 = decoder.feed(&frame1);
690        assert_eq!(events1.len(), 1);
691        assert_eq!(events1[0], rnode_kiss::RNodeEvent::DataFrame { index: 1, data: payload1 });
692    }
693
694    #[test]
695    fn rnode_error_handling() {
696        // NOTE: CMD_ERROR (0x90) == CMD_INT5_DATA (0x90) — they share the same
697        // byte value. In multi-radio mode, 0x90 is treated as INT5_DATA.
698        // Error events only appear on single-radio devices where there's no
699        // INT5_DATA. The decoder treats 0x90 as INT5_DATA (data wins).
700        // For real error detection, we rely on the data frame being invalid
701        // or the device sending a different error indicator.
702        // Test that cmd 0x90 is correctly handled as a data frame.
703        let mut decoder = rnode_kiss::RNodeDecoder::new();
704        let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
705        let events = decoder.feed(&frame);
706        assert_eq!(events.len(), 1);
707        // 0x90 = CMD_INT5_DATA, so it's a DataFrame with index 5
708        assert_eq!(
709            events[0],
710            rnode_kiss::RNodeEvent::DataFrame {
711                index: 5,
712                data: vec![0x02]
713            }
714        );
715    }
716
717    #[test]
718    fn rnode_config_validation() {
719        let good = RNodeSubConfig {
720            name: "test".into(),
721            frequency: 868_000_000,
722            bandwidth: 125_000,
723            txpower: 7,
724            spreading_factor: 8,
725            coding_rate: 5,
726            flow_control: false,
727            st_alock: None,
728            lt_alock: None,
729        };
730        assert!(validate_sub_config(&good).is_none());
731
732        // Bad frequency
733        let mut bad = good.clone();
734        bad.frequency = 100;
735        assert!(validate_sub_config(&bad).is_some());
736
737        // Bad SF
738        bad = good.clone();
739        bad.spreading_factor = 13;
740        assert!(validate_sub_config(&bad).is_some());
741
742        // Bad CR
743        bad = good.clone();
744        bad.coding_rate = 9;
745        assert!(validate_sub_config(&bad).is_some());
746
747        // Bad BW
748        bad = good.clone();
749        bad.bandwidth = 5;
750        assert!(validate_sub_config(&bad).is_some());
751
752        // Bad TX power
753        bad = good.clone();
754        bad.txpower = 50;
755        assert!(validate_sub_config(&bad).is_some());
756    }
757}