Skip to main content

rns_net/interface/
rnode.rs

1//! RNode LoRa radio interface.
2//!
3//! Manages serial connection to RNode device, detects firmware,
4//! configures radio parameters per subinterface. Supports single-radio
5//! (RNodeInterface) and multi-radio (RNodeMultiInterface) devices.
6//!
7//! Matches Python `RNodeInterface.py` and `RNodeMultiInterface.py`.
8
9use std::io::{self, Read, Write};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::InterfaceId;
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18use crate::rnode_kiss;
19use crate::serial::{Parity, SerialConfig, SerialPort};
20
21/// Validation limits matching Python RNodeInterface.
22pub const FREQ_MIN: u32 = 137_000_000;
23pub const FREQ_MAX: u32 = 3_000_000_000;
24pub const BW_MIN: u32 = 7_800;
25pub const BW_MAX: u32 = 1_625_000;
26pub const SF_MIN: u8 = 5;
27pub const SF_MAX: u8 = 12;
28pub const CR_MIN: u8 = 5;
29pub const CR_MAX: u8 = 8;
30pub const TXPOWER_MIN: i8 = 0;
31pub const TXPOWER_MAX: i8 = 37;
32pub const HW_MTU: u32 = 508;
33
34/// Configuration for one RNode subinterface (radio).
35#[derive(Debug, Clone)]
36pub struct RNodeSubConfig {
37    pub name: String,
38    pub frequency: u32,
39    pub bandwidth: u32,
40    pub txpower: i8,
41    pub spreading_factor: u8,
42    pub coding_rate: u8,
43    pub flow_control: bool,
44    pub st_alock: Option<f32>,
45    pub lt_alock: Option<f32>,
46}
47
48/// Configuration for the RNode device.
49#[derive(Debug, Clone)]
50pub struct RNodeConfig {
51    pub name: String,
52    pub port: String,
53    pub speed: u32,
54    pub subinterfaces: Vec<RNodeSubConfig>,
55    pub id_interval: Option<u32>,
56    pub id_callsign: Option<Vec<u8>>,
57    pub base_interface_id: InterfaceId,
58    /// Pre-opened file descriptor (e.g. from a USB bridge socketpair on Android).
59    /// When set, `start()` uses this fd directly instead of opening `port`.
60    pub pre_opened_fd: Option<i32>,
61    pub runtime: Arc<Mutex<RNodeRuntime>>,
62}
63
64#[derive(Debug, Clone)]
65pub struct RNodeRuntime {
66    pub sub: RNodeSubConfig,
67    pub writer: Option<Arc<Mutex<std::fs::File>>>,
68}
69
70impl RNodeRuntime {
71    pub fn from_config(config: &RNodeConfig) -> Self {
72        Self {
73            sub: config
74                .subinterfaces
75                .first()
76                .cloned()
77                .unwrap_or_else(|| RNodeSubConfig {
78                    name: config.name.clone(),
79                    frequency: 868_000_000,
80                    bandwidth: 125_000,
81                    txpower: 7,
82                    spreading_factor: 8,
83                    coding_rate: 5,
84                    flow_control: false,
85                    st_alock: None,
86                    lt_alock: None,
87                }),
88            writer: None,
89        }
90    }
91}
92
93#[derive(Debug, Clone)]
94pub struct RNodeRuntimeConfigHandle {
95    pub interface_name: String,
96    pub runtime: Arc<Mutex<RNodeRuntime>>,
97    pub startup: RNodeRuntime,
98}
99
100impl Default for RNodeConfig {
101    fn default() -> Self {
102        let mut config = RNodeConfig {
103            name: String::new(),
104            port: String::new(),
105            speed: 115200,
106            subinterfaces: Vec::new(),
107            id_interval: None,
108            id_callsign: None,
109            base_interface_id: InterfaceId(0),
110            pre_opened_fd: None,
111            runtime: Arc::new(Mutex::new(RNodeRuntime {
112                sub: RNodeSubConfig {
113                    name: String::new(),
114                    frequency: 868_000_000,
115                    bandwidth: 125_000,
116                    txpower: 7,
117                    spreading_factor: 8,
118                    coding_rate: 5,
119                    flow_control: false,
120                    st_alock: None,
121                    lt_alock: None,
122                },
123                writer: None,
124            })),
125        };
126        let startup = RNodeRuntime::from_config(&config);
127        config.runtime = Arc::new(Mutex::new(startup));
128        config
129    }
130}
131
132/// Validate subinterface configuration. Returns error message if invalid.
133pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
134    if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
135        return Some(format!(
136            "Invalid frequency {} for {}",
137            sub.frequency, sub.name
138        ));
139    }
140    if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
141        return Some(format!(
142            "Invalid bandwidth {} for {}",
143            sub.bandwidth, sub.name
144        ));
145    }
146    if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
147        return Some(format!(
148            "Invalid SF {} for {}",
149            sub.spreading_factor, sub.name
150        ));
151    }
152    if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
153        return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
154    }
155    if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
156        return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
157    }
158    if let Some(st) = sub.st_alock {
159        if st < 0.0 || st > 100.0 {
160            return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
161        }
162    }
163    if let Some(lt) = sub.lt_alock {
164        if lt < 0.0 || lt > 100.0 {
165            return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
166        }
167    }
168    None
169}
170
171/// Writer for a specific RNode subinterface.
172/// Wraps a shared serial writer with subinterface-specific data framing.
173struct RNodeSubWriter {
174    writer: Arc<Mutex<std::fs::File>>,
175    index: u8,
176    flow_control: bool,
177    flow_state: Arc<Mutex<SubFlowState>>,
178}
179
180struct SubFlowState {
181    ready: bool,
182    queue: std::collections::VecDeque<Vec<u8>>,
183}
184
185fn make_sub_writer(
186    writer: Arc<Mutex<std::fs::File>>,
187    index: u8,
188    flow_control: bool,
189    flow_state: Arc<Mutex<SubFlowState>>,
190) -> Box<dyn Writer> {
191    Box::new(RNodeSubWriter {
192        writer,
193        index,
194        flow_control,
195        flow_state,
196    })
197}
198
199impl Writer for RNodeSubWriter {
200    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
201        let frame = rnode_kiss::rnode_data_frame(self.index, data);
202        if self.flow_control {
203            let mut state = self.flow_state.lock().unwrap();
204            if state.ready {
205                state.ready = false;
206                drop(state);
207                self.writer.lock().unwrap().write_all(&frame)
208            } else {
209                state.queue.push_back(data.to_vec());
210                Ok(())
211            }
212        } else {
213            self.writer.lock().unwrap().write_all(&frame)
214        }
215    }
216}
217
218/// Start the RNode interface.
219///
220/// Opens serial port, spawns reader thread which performs detect+configure,
221/// then enters data relay mode.
222///
223/// Returns one `(InterfaceId, Box<dyn Writer>)` per subinterface.
224pub fn start(
225    config: RNodeConfig,
226    tx: EventSender,
227) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
228    // Validate all subinterface configs upfront
229    for sub in &config.subinterfaces {
230        if let Some(err) = validate_sub_config(sub) {
231            return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
232        }
233    }
234
235    if config.subinterfaces.is_empty() {
236        return Err(io::Error::new(
237            io::ErrorKind::InvalidInput,
238            "No subinterfaces configured",
239        ));
240    }
241
242    let (reader_file, shared_writer) = if let Some(fd) = config.pre_opened_fd {
243        // Pre-opened fd from USB bridge — dup for independent reader/writer handles
244        let port = SerialPort::from_raw_fd(fd);
245        let r = port.reader()?;
246        let w = port.writer()?;
247        std::mem::forget(port); // don't close the original fd — bridge owns it
248        (r, Arc::new(Mutex::new(w)))
249    } else {
250        let serial_config = SerialConfig {
251            path: config.port.clone(),
252            baud: config.speed,
253            data_bits: 8,
254            parity: Parity::None,
255            stop_bits: 1,
256        };
257        let port = SerialPort::open(&serial_config)?;
258        let r = port.reader()?;
259        let w = port.writer()?;
260        (r, Arc::new(Mutex::new(w)))
261    };
262
263    // Build per-subinterface writers and IDs
264    let num_subs = config.subinterfaces.len();
265    let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
266    let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
267
268    for (i, sub) in config.subinterfaces.iter().enumerate() {
269        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
270        let flow_state = Arc::new(Mutex::new(SubFlowState {
271            ready: true,
272            queue: std::collections::VecDeque::new(),
273        }));
274        flow_states.push(flow_state.clone());
275        let sub_writer =
276            make_sub_writer(shared_writer.clone(), i as u8, sub.flow_control, flow_state);
277        writers.push((sub_id, sub_writer));
278    }
279
280    // Spawn reader thread
281    let reader_shared_writer = shared_writer.clone();
282    {
283        let mut runtime = config.runtime.lock().unwrap();
284        runtime.writer = Some(shared_writer.clone());
285        runtime.sub = config
286            .subinterfaces
287            .first()
288            .cloned()
289            .unwrap_or(runtime.sub.clone());
290    }
291    let reader_config = config.clone();
292    let reader_flow_states = flow_states;
293    thread::Builder::new()
294        .name(format!("rnode-reader-{}", config.base_interface_id.0))
295        .spawn(move || {
296            reader_loop(
297                reader_file,
298                reader_shared_writer,
299                reader_config,
300                tx,
301                reader_flow_states,
302            );
303        })?;
304
305    // Spawn keepalive thread — sends periodic DETECT to prevent firmware
306    // bridge idle timeout (ESP32 RNode reverts to standalone after 30s idle).
307    let keepalive_writer = shared_writer.clone();
308    let keepalive_name = config.name.clone();
309    thread::Builder::new()
310        .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
311        .spawn(move || {
312            let detect = rnode_kiss::detect_request();
313            loop {
314                thread::sleep(Duration::from_secs(15));
315                if let Err(e) = keepalive_writer.lock().unwrap().write_all(&detect) {
316                    log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
317                }
318            }
319        })?;
320
321    Ok(writers)
322}
323
324/// Reader loop: detect device, configure radios, then relay data frames.
325fn reader_loop(
326    mut reader: std::fs::File,
327    writer: Arc<Mutex<std::fs::File>>,
328    config: RNodeConfig,
329    tx: EventSender,
330    flow_states: Vec<Arc<Mutex<SubFlowState>>>,
331) {
332    const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(200);
333    const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(2);
334    // Initial delay for hardware init (matches Python: sleep(2.0))
335    thread::sleep(Duration::from_secs(2));
336    let mut connected_once = false;
337    if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
338        log::error!("[{}] initial RNode setup failed: {}", config.name, e);
339        return;
340    }
341    signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
342    connected_once = true;
343    loop {
344        let mut decoder = rnode_kiss::RNodeDecoder::new();
345        let mut buf = [0u8; 4096];
346        let disconnected = loop {
347            match reader.read(&mut buf) {
348                Ok(0) => {
349                    log::warn!("[{}] serial port closed", config.name);
350                    signal_interface_down(&tx, &config);
351                    break true;
352                }
353                Ok(n) => {
354                    for event in decoder.feed(&buf[..n]) {
355                        match event {
356                            rnode_kiss::RNodeEvent::DataFrame { index, data } => {
357                                let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
358                                if tx
359                                    .send(Event::Frame {
360                                        interface_id: sub_id,
361                                        data,
362                                    })
363                                    .is_err()
364                                {
365                                    return;
366                                }
367                            }
368                            rnode_kiss::RNodeEvent::Ready => {
369                                // Flow control: unlock all subinterfaces that have flow_control
370                                for (i, fs) in flow_states.iter().enumerate() {
371                                    if config.subinterfaces[i].flow_control {
372                                        process_flow_queue(fs, &writer, i as u8);
373                                    }
374                                }
375                            }
376                            rnode_kiss::RNodeEvent::Error(code) => {
377                                log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
378                            }
379                            _ => {
380                                // Status updates logged but not acted on
381                            }
382                        }
383                    }
384                }
385                Err(e) => {
386                    log::error!("[{}] serial read error: {}", config.name, e);
387                    signal_interface_down(&tx, &config);
388                    break true;
389                }
390            }
391        };
392
393        if !disconnected || config.pre_opened_fd.is_some() {
394            return;
395        }
396
397        let mut backoff = RECONNECT_INITIAL_DELAY;
398        loop {
399            match reopen_connection(&config, &writer) {
400                Ok(new_reader) => {
401                    reset_flow_states(&flow_states);
402                    reader = new_reader;
403                    if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
404                        log::warn!("[{}] reconnect configure failed: {}", config.name, e);
405                        thread::sleep(backoff);
406                        backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
407                        continue;
408                    }
409                    signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
410                    break;
411                }
412                Err(e) => {
413                    log::warn!("[{}] reconnect open failed: {}", config.name, e);
414                    thread::sleep(backoff);
415                    backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
416                }
417            }
418        }
419    }
420}
421
422fn detect_and_configure(
423    reader: &mut std::fs::File,
424    writer: &Arc<Mutex<std::fs::File>>,
425    config: &RNodeConfig,
426) -> io::Result<()> {
427    let detect_cmd = rnode_kiss::detect_request();
428    let mut cmd = detect_cmd;
429    cmd.extend_from_slice(&rnode_kiss::rnode_command(
430        rnode_kiss::CMD_FW_VERSION,
431        &[0x00],
432    ));
433    cmd.extend_from_slice(&rnode_kiss::rnode_command(
434        rnode_kiss::CMD_FW_DETAIL,
435        &[0x00],
436    ));
437    cmd.extend_from_slice(&rnode_kiss::rnode_command(
438        rnode_kiss::CMD_PLATFORM,
439        &[0x00],
440    ));
441    cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
442
443    writer.lock().unwrap().write_all(&cmd)?;
444
445    let mut decoder = rnode_kiss::RNodeDecoder::new();
446    let mut buf = [0u8; 4096];
447    let mut detected = false;
448    let detect_start = std::time::Instant::now();
449    let detect_timeout = Duration::from_secs(5);
450
451    while !detected && detect_start.elapsed() < detect_timeout {
452        match reader.read(&mut buf) {
453            Ok(0) => {
454                return Err(io::Error::new(
455                    io::ErrorKind::UnexpectedEof,
456                    "serial port closed during detect",
457                ));
458            }
459            Ok(n) => {
460                for event in decoder.feed(&buf[..n]) {
461                    match event {
462                        rnode_kiss::RNodeEvent::Detected(true) => {
463                            detected = true;
464                            log::info!("[{}] RNode device detected", config.name);
465                        }
466                        rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
467                            log::info!("[{}] firmware version {}.{}", config.name, major, minor);
468                        }
469                        rnode_kiss::RNodeEvent::FirmwareDetail(ref detail) => {
470                            log::info!("[{}] firmware detail: {}", config.name, detail);
471                        }
472                        rnode_kiss::RNodeEvent::Platform(p) => {
473                            log::info!("[{}] platform: 0x{:02X}", config.name, p);
474                        }
475                        rnode_kiss::RNodeEvent::Mcu(m) => {
476                            log::info!("[{}] MCU: 0x{:02X}", config.name, m);
477                        }
478                        _ => {}
479                    }
480                }
481            }
482            Err(e) => {
483                return Err(io::Error::new(
484                    e.kind(),
485                    format!("serial read error during detect: {}", e),
486                ));
487            }
488        }
489    }
490
491    if !detected {
492        return Err(io::Error::new(
493            io::ErrorKind::TimedOut,
494            "RNode detection timed out",
495        ));
496    }
497
498    for (i, sub) in config.subinterfaces.iter().enumerate() {
499        configure_subinterface(writer, i as u8, sub, config.subinterfaces.len() > 1)?;
500    }
501
502    thread::sleep(Duration::from_millis(300));
503    log::info!(
504        "[{}] RNode configured with {} subinterface(s)",
505        config.name,
506        config.subinterfaces.len()
507    );
508    Ok(())
509}
510
511fn signal_interface_down(tx: &EventSender, config: &RNodeConfig) {
512    for i in 0..config.subinterfaces.len() {
513        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
514        let _ = tx.send(Event::InterfaceDown(sub_id));
515    }
516}
517
518fn signal_interface_up(
519    tx: &EventSender,
520    config: &RNodeConfig,
521    writer: &Arc<Mutex<std::fs::File>>,
522    flow_states: &[Arc<Mutex<SubFlowState>>],
523    reconnected: bool,
524) {
525    for (i, flow_state) in flow_states.iter().enumerate() {
526        let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
527        let new_writer = reconnected.then(|| {
528            make_sub_writer(
529                writer.clone(),
530                i as u8,
531                config.subinterfaces[i].flow_control,
532                flow_state.clone(),
533            )
534        });
535        let _ = tx.send(Event::InterfaceUp(sub_id, new_writer, None));
536    }
537}
538
539fn reset_flow_states(flow_states: &[Arc<Mutex<SubFlowState>>]) {
540    for flow_state in flow_states {
541        let mut state = flow_state.lock().unwrap();
542        state.ready = true;
543        state.queue.clear();
544    }
545}
546
547fn reopen_connection(
548    config: &RNodeConfig,
549    writer: &Arc<Mutex<std::fs::File>>,
550) -> io::Result<std::fs::File> {
551    let serial_config = SerialConfig {
552        path: config.port.clone(),
553        baud: config.speed,
554        data_bits: 8,
555        parity: Parity::None,
556        stop_bits: 1,
557    };
558    let port = SerialPort::open(&serial_config)?;
559    let reader = port.reader()?;
560    let new_writer = port.writer()?;
561    *writer.lock().unwrap() = new_writer;
562    Ok(reader)
563}
564
565/// Configure a single subinterface on the RNode device.
566pub(crate) fn configure_subinterface(
567    writer: &Arc<Mutex<std::fs::File>>,
568    index: u8,
569    sub: &RNodeSubConfig,
570    multi: bool,
571) -> io::Result<()> {
572    let mut w = writer.lock().unwrap();
573
574    // For multi-radio, send select command before each parameter
575    let freq_bytes = [
576        (sub.frequency >> 24) as u8,
577        (sub.frequency >> 16) as u8,
578        (sub.frequency >> 8) as u8,
579        (sub.frequency & 0xFF) as u8,
580    ];
581    let bw_bytes = [
582        (sub.bandwidth >> 24) as u8,
583        (sub.bandwidth >> 16) as u8,
584        (sub.bandwidth >> 8) as u8,
585        (sub.bandwidth & 0xFF) as u8,
586    ];
587    let txp = if sub.txpower < 0 {
588        (sub.txpower as i16 + 256) as u8
589    } else {
590        sub.txpower as u8
591    };
592
593    if multi {
594        w.write_all(&rnode_kiss::rnode_select_command(
595            index,
596            rnode_kiss::CMD_FREQUENCY,
597            &freq_bytes,
598        ))?;
599        w.write_all(&rnode_kiss::rnode_select_command(
600            index,
601            rnode_kiss::CMD_BANDWIDTH,
602            &bw_bytes,
603        ))?;
604        w.write_all(&rnode_kiss::rnode_select_command(
605            index,
606            rnode_kiss::CMD_TXPOWER,
607            &[txp],
608        ))?;
609        w.write_all(&rnode_kiss::rnode_select_command(
610            index,
611            rnode_kiss::CMD_SF,
612            &[sub.spreading_factor],
613        ))?;
614        w.write_all(&rnode_kiss::rnode_select_command(
615            index,
616            rnode_kiss::CMD_CR,
617            &[sub.coding_rate],
618        ))?;
619    } else {
620        w.write_all(&rnode_kiss::rnode_command(
621            rnode_kiss::CMD_FREQUENCY,
622            &freq_bytes,
623        ))?;
624        w.write_all(&rnode_kiss::rnode_command(
625            rnode_kiss::CMD_BANDWIDTH,
626            &bw_bytes,
627        ))?;
628        w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
629        w.write_all(&rnode_kiss::rnode_command(
630            rnode_kiss::CMD_SF,
631            &[sub.spreading_factor],
632        ))?;
633        w.write_all(&rnode_kiss::rnode_command(
634            rnode_kiss::CMD_CR,
635            &[sub.coding_rate],
636        ))?;
637    }
638
639    // Airtime locks
640    if let Some(st) = sub.st_alock {
641        let st_val = (st * 100.0) as u16;
642        let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
643        if multi {
644            w.write_all(&rnode_kiss::rnode_select_command(
645                index,
646                rnode_kiss::CMD_ST_ALOCK,
647                &st_bytes,
648            ))?;
649        } else {
650            w.write_all(&rnode_kiss::rnode_command(
651                rnode_kiss::CMD_ST_ALOCK,
652                &st_bytes,
653            ))?;
654        }
655    }
656    if let Some(lt) = sub.lt_alock {
657        let lt_val = (lt * 100.0) as u16;
658        let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
659        if multi {
660            w.write_all(&rnode_kiss::rnode_select_command(
661                index,
662                rnode_kiss::CMD_LT_ALOCK,
663                &lt_bytes,
664            ))?;
665        } else {
666            w.write_all(&rnode_kiss::rnode_command(
667                rnode_kiss::CMD_LT_ALOCK,
668                &lt_bytes,
669            ))?;
670        }
671    }
672
673    // Turn on radio
674    if multi {
675        w.write_all(&rnode_kiss::rnode_select_command(
676            index,
677            rnode_kiss::CMD_RADIO_STATE,
678            &[rnode_kiss::RADIO_STATE_ON],
679        ))?;
680    } else {
681        w.write_all(&rnode_kiss::rnode_command(
682            rnode_kiss::CMD_RADIO_STATE,
683            &[rnode_kiss::RADIO_STATE_ON],
684        ))?;
685    }
686
687    Ok(())
688}
689
690/// Process flow control queue for a subinterface.
691fn process_flow_queue(
692    flow_state: &Arc<Mutex<SubFlowState>>,
693    writer: &Arc<Mutex<std::fs::File>>,
694    index: u8,
695) {
696    let mut state = flow_state.lock().unwrap();
697    if let Some(data) = state.queue.pop_front() {
698        state.ready = false;
699        drop(state);
700        let frame = rnode_kiss::rnode_data_frame(index, &data);
701        let _ = writer.lock().unwrap().write_all(&frame);
702    } else {
703        state.ready = true;
704    }
705}
706
707// --- Factory implementation ---
708
709use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, SubInterface};
710use rns_core::transport::types::InterfaceInfo;
711use std::collections::HashMap;
712
713/// Factory for `RNodeInterface`.
714pub struct RNodeFactory;
715
716impl InterfaceFactory for RNodeFactory {
717    fn type_name(&self) -> &str {
718        "RNodeInterface"
719    }
720
721    fn default_ifac_size(&self) -> usize {
722        8
723    }
724
725    fn parse_config(
726        &self,
727        name: &str,
728        id: InterfaceId,
729        params: &HashMap<String, String>,
730    ) -> Result<Box<dyn InterfaceConfigData>, String> {
731        let pre_opened_fd = params.get("fd").and_then(|v| v.parse::<i32>().ok());
732
733        let port = params
734            .get("port")
735            .cloned()
736            .or_else(|| pre_opened_fd.map(|_| "usb-bridge".to_string()))
737            .ok_or_else(|| "RNodeInterface requires 'port' or 'fd'".to_string())?;
738
739        let speed = params
740            .get("speed")
741            .and_then(|v| v.parse().ok())
742            .unwrap_or(115200u32);
743
744        let frequency = params
745            .get("frequency")
746            .and_then(|v| v.parse().ok())
747            .unwrap_or(868_000_000u32);
748
749        let bandwidth = params
750            .get("bandwidth")
751            .and_then(|v| v.parse().ok())
752            .unwrap_or(125_000u32);
753
754        let txpower = params
755            .get("txpower")
756            .and_then(|v| v.parse().ok())
757            .unwrap_or(7i8);
758
759        let spreading_factor = params
760            .get("spreadingfactor")
761            .or_else(|| params.get("spreading_factor"))
762            .and_then(|v| v.parse().ok())
763            .unwrap_or(8u8);
764
765        let coding_rate = params
766            .get("codingrate")
767            .or_else(|| params.get("coding_rate"))
768            .and_then(|v| v.parse().ok())
769            .unwrap_or(5u8);
770
771        let flow_control = params
772            .get("flow_control")
773            .and_then(|v| crate::config::parse_bool_pub(v))
774            .unwrap_or(false);
775
776        let st_alock = params.get("st_alock").and_then(|v| v.parse().ok());
777
778        let lt_alock = params.get("lt_alock").and_then(|v| v.parse().ok());
779
780        let id_interval = params.get("id_interval").and_then(|v| v.parse().ok());
781
782        let id_callsign = params.get("id_callsign").map(|v| v.as_bytes().to_vec());
783
784        let sub = RNodeSubConfig {
785            name: name.to_string(),
786            frequency,
787            bandwidth,
788            txpower,
789            spreading_factor,
790            coding_rate,
791            flow_control,
792            st_alock,
793            lt_alock,
794        };
795
796        Ok(Box::new(RNodeConfig {
797            name: name.to_string(),
798            port,
799            speed,
800            subinterfaces: vec![sub],
801            id_interval,
802            id_callsign,
803            base_interface_id: id,
804            pre_opened_fd,
805            runtime: Arc::new(Mutex::new(RNodeRuntime {
806                sub: RNodeSubConfig {
807                    name: name.to_string(),
808                    frequency,
809                    bandwidth,
810                    txpower,
811                    spreading_factor,
812                    coding_rate,
813                    flow_control,
814                    st_alock,
815                    lt_alock,
816                },
817                writer: None,
818            })),
819        }))
820    }
821
822    fn start(
823        &self,
824        config: Box<dyn InterfaceConfigData>,
825        ctx: StartContext,
826    ) -> std::io::Result<StartResult> {
827        let rnode_config = *config.into_any().downcast::<RNodeConfig>().map_err(|_| {
828            std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
829        })?;
830
831        let name = rnode_config.name.clone();
832
833        let pairs = start(rnode_config, ctx.tx)?;
834
835        let mut subs = Vec::with_capacity(pairs.len());
836        for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
837            let sub_name = if index == 0 {
838                name.clone()
839            } else {
840                format!("{}/{}", name, index)
841            };
842
843            let info = InterfaceInfo {
844                id: sub_id,
845                name: sub_name,
846                mode: ctx.mode,
847                out_capable: true,
848                in_capable: true,
849                bitrate: None,
850                announce_rate_target: None,
851                announce_rate_grace: 0,
852                announce_rate_penalty: 0.0,
853                announce_cap: rns_core::constants::ANNOUNCE_CAP,
854                is_local_client: false,
855                wants_tunnel: false,
856                tunnel_id: None,
857                mtu: rns_core::constants::MTU as u32,
858                ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
859                ia_freq: 0.0,
860                started: crate::time::now(),
861            };
862
863            subs.push(SubInterface {
864                id: sub_id,
865                info,
866                writer,
867                interface_type_name: "RNodeInterface".to_string(),
868            });
869        }
870
871        Ok(StartResult::Multi(subs))
872    }
873}
874
875pub(crate) fn rnode_runtime_handle_from_config(config: &RNodeConfig) -> RNodeRuntimeConfigHandle {
876    RNodeRuntimeConfigHandle {
877        interface_name: config.name.clone(),
878        runtime: Arc::clone(&config.runtime),
879        startup: RNodeRuntime::from_config(config),
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::event;
887    use crate::kiss;
888    use crate::serial::open_pty_pair;
889    use std::os::unix::io::{AsRawFd, FromRawFd};
890    use std::path::PathBuf;
891    use std::sync::mpsc::RecvTimeoutError;
892    use tempfile::tempdir;
893    /// Helper: poll an fd for reading with timeout (ms).
894    fn poll_read(fd: i32, timeout_ms: i32) -> bool {
895        let mut pfd = libc::pollfd {
896            fd,
897            events: libc::POLLIN,
898            revents: 0,
899        };
900        let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
901        ret > 0
902    }
903
904    /// Read all available bytes from an fd.
905    fn read_available(file: &mut std::fs::File) -> Vec<u8> {
906        let mut all = Vec::new();
907        let mut buf = [0u8; 4096];
908        while poll_read(file.as_raw_fd(), 100) {
909            match file.read(&mut buf) {
910                Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
911                _ => break,
912            }
913        }
914        all
915    }
916
917    fn slave_tty_path(fd: i32) -> PathBuf {
918        let mut buf = [0u8; 256];
919        let rc = unsafe { libc::ttyname_r(fd, buf.as_mut_ptr().cast(), buf.len()) };
920        assert_eq!(rc, 0, "ttyname_r failed for fd {}", fd);
921        let nul = buf.iter().position(|b| *b == 0).unwrap_or(buf.len());
922        PathBuf::from(std::str::from_utf8(&buf[..nul]).unwrap())
923    }
924
925    fn wait_for_interface_event<F>(
926        rx: &std::sync::mpsc::Receiver<Event>,
927        timeout: Duration,
928        predicate: F,
929    ) -> Event
930    where
931        F: Fn(&Event) -> bool,
932    {
933        let deadline = std::time::Instant::now() + timeout;
934        loop {
935            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
936            match rx.recv_timeout(remaining) {
937                Ok(event) if predicate(&event) => return event,
938                Ok(_) => continue,
939                Err(RecvTimeoutError::Timeout) => panic!("timed out waiting for interface event"),
940                Err(RecvTimeoutError::Disconnected) => panic!("event channel disconnected"),
941            }
942        }
943    }
944
945    /// Mock RNode: respond to detect with DETECT_RESP, FW version, platform
946    fn mock_respond_detect(master: &mut std::fs::File) {
947        // Respond: DETECT_RESP
948        master
949            .write_all(&rnode_kiss::rnode_command(
950                rnode_kiss::CMD_DETECT,
951                &[rnode_kiss::DETECT_RESP],
952            ))
953            .unwrap();
954        // FW version 1.74
955        master
956            .write_all(&rnode_kiss::rnode_command(
957                rnode_kiss::CMD_FW_VERSION,
958                &[0x01, 0x4A],
959            ))
960            .unwrap();
961        // Platform ESP32
962        master
963            .write_all(&rnode_kiss::rnode_command(
964                rnode_kiss::CMD_PLATFORM,
965                &[0x80],
966            ))
967            .unwrap();
968        // MCU
969        master
970            .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
971            .unwrap();
972        master.flush().unwrap();
973    }
974
975    #[test]
976    fn rnode_detect_over_pty() {
977        // Test that the RNode decoder can parse detect responses from a PTY
978        let (master_fd, slave_fd) = open_pty_pair().unwrap();
979        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
980        let mut slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
981
982        // Write detect response to master
983        mock_respond_detect(&mut master);
984
985        let mut decoder = rnode_kiss::RNodeDecoder::new();
986        let mut events = Vec::new();
987        let deadline = std::time::Instant::now() + Duration::from_secs(2);
988
989        while std::time::Instant::now() < deadline {
990            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
991            let timeout_ms = remaining.as_millis().min(i32::MAX as u128) as i32;
992            if !poll_read(slave.as_raw_fd(), timeout_ms) {
993                break;
994            }
995
996            let chunk = read_available(&mut slave);
997            if chunk.is_empty() {
998                continue;
999            }
1000
1001            events.extend(decoder.feed(&chunk));
1002
1003            let saw_detect = events
1004                .iter()
1005                .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true)));
1006            let saw_firmware = events
1007                .iter()
1008                .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. }));
1009            if saw_detect && saw_firmware {
1010                break;
1011            }
1012        }
1013
1014        assert!(
1015            events
1016                .iter()
1017                .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
1018            "should detect device"
1019        );
1020        assert!(
1021            events
1022                .iter()
1023                .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
1024            "should get firmware version"
1025        );
1026    }
1027
1028    #[test]
1029    fn rnode_configure_commands() {
1030        let (master_fd, slave_fd) = open_pty_pair().unwrap();
1031        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1032        let writer_file = unsafe { std::fs::File::from_raw_fd(libc::dup(slave_fd)) };
1033        let writer = Arc::new(Mutex::new(writer_file));
1034
1035        let sub = RNodeSubConfig {
1036            name: "test".into(),
1037            frequency: 868_000_000,
1038            bandwidth: 125_000,
1039            txpower: 7,
1040            spreading_factor: 8,
1041            coding_rate: 5,
1042            flow_control: false,
1043            st_alock: None,
1044            lt_alock: None,
1045        };
1046
1047        configure_subinterface(&writer, 0, &sub, false).unwrap();
1048
1049        // Read what was sent
1050        let data = read_available(&mut master);
1051
1052        // Should contain frequency command
1053        assert!(
1054            data.windows(2)
1055                .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
1056            "should contain FREQUENCY command"
1057        );
1058        // Should contain bandwidth command
1059        assert!(
1060            data.windows(2)
1061                .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
1062            "should contain BANDWIDTH command"
1063        );
1064        // Should contain RADIO_STATE ON
1065        assert!(
1066            data.windows(3).any(|w| w[0] == kiss::FEND
1067                && w[1] == rnode_kiss::CMD_RADIO_STATE
1068                && w[2] == rnode_kiss::RADIO_STATE_ON),
1069            "should contain RADIO_STATE ON command"
1070        );
1071
1072        unsafe { libc::close(slave_fd) };
1073    }
1074
1075    #[test]
1076    fn rnode_data_roundtrip() {
1077        let (master_fd, slave_fd) = open_pty_pair().unwrap();
1078        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1079        let slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1080
1081        // Write a data frame (subinterface 0) to master
1082        let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
1083        let frame = rnode_kiss::rnode_data_frame(0, &payload);
1084        master.write_all(&frame).unwrap();
1085        master.flush().unwrap();
1086
1087        // Read from slave with RNode decoder
1088        assert!(poll_read(slave.as_raw_fd(), 2000));
1089        let mut decoder = rnode_kiss::RNodeDecoder::new();
1090        let mut buf = [0u8; 4096];
1091        let mut slave_file = slave;
1092        let n = slave_file.read(&mut buf).unwrap();
1093        let events = decoder.feed(&buf[..n]);
1094
1095        assert_eq!(events.len(), 1);
1096        match &events[0] {
1097            rnode_kiss::RNodeEvent::DataFrame { index, data } => {
1098                assert_eq!(*index, 0);
1099                assert_eq!(data, &payload);
1100            }
1101            other => panic!("expected DataFrame, got {:?}", other),
1102        }
1103    }
1104
1105    #[test]
1106    fn rnode_flow_control() {
1107        let (master_fd, slave_fd) = open_pty_pair().unwrap();
1108        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1109        let shared_writer = Arc::new(Mutex::new(writer_file));
1110
1111        let flow_state = Arc::new(Mutex::new(SubFlowState {
1112            ready: true,
1113            queue: std::collections::VecDeque::new(),
1114        }));
1115
1116        let mut sub_writer = RNodeSubWriter {
1117            writer: shared_writer.clone(),
1118            index: 0,
1119            flow_control: true,
1120            flow_state: flow_state.clone(),
1121        };
1122
1123        // First send: should go through (ready=true) and set ready=false
1124        sub_writer.send_frame(b"hello").unwrap();
1125        assert!(!flow_state.lock().unwrap().ready);
1126
1127        // Second send: should be queued
1128        sub_writer.send_frame(b"world").unwrap();
1129        assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
1130
1131        // Process flow queue (simulates CMD_READY)
1132        process_flow_queue(&flow_state, &shared_writer, 0);
1133        assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
1134        assert!(!flow_state.lock().unwrap().ready); // sent queued, still locked
1135
1136        // Process again with empty queue: sets ready=true
1137        process_flow_queue(&flow_state, &shared_writer, 0);
1138        assert!(flow_state.lock().unwrap().ready);
1139
1140        unsafe { libc::close(master_fd) };
1141    }
1142
1143    #[test]
1144    fn rnode_sub_writer_format() {
1145        let (master_fd, slave_fd) = open_pty_pair().unwrap();
1146        let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1147        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1148        let shared_writer = Arc::new(Mutex::new(writer_file));
1149
1150        let flow_state = Arc::new(Mutex::new(SubFlowState {
1151            ready: true,
1152            queue: std::collections::VecDeque::new(),
1153        }));
1154
1155        let mut sub_writer = RNodeSubWriter {
1156            writer: shared_writer,
1157            index: 1, // subinterface 1
1158            flow_control: false,
1159            flow_state,
1160        };
1161
1162        let payload = vec![0xAA, 0xBB, 0xCC];
1163        sub_writer.send_frame(&payload).unwrap();
1164
1165        // Read from master
1166        assert!(poll_read(master.as_raw_fd(), 2000));
1167        let mut buf = [0u8; 256];
1168        let n = master.read(&mut buf).unwrap();
1169
1170        // Should start with FEND, CMD_INT1_DATA (0x10)
1171        assert_eq!(buf[0], kiss::FEND);
1172        assert_eq!(buf[1], 0x10); // CMD_INT1_DATA
1173        assert_eq!(buf[n - 1], kiss::FEND);
1174    }
1175
1176    #[test]
1177    fn rnode_multi_sub_routing() {
1178        // Test that data frames on different CMD_INTn route to different indices
1179        let mut decoder = rnode_kiss::RNodeDecoder::new();
1180
1181        let payload0 = vec![0x01, 0x02];
1182        let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
1183        let events0 = decoder.feed(&frame0);
1184        assert_eq!(events0.len(), 1);
1185        assert_eq!(
1186            events0[0],
1187            rnode_kiss::RNodeEvent::DataFrame {
1188                index: 0,
1189                data: payload0
1190            }
1191        );
1192
1193        let payload1 = vec![0x03, 0x04];
1194        let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
1195        let events1 = decoder.feed(&frame1);
1196        assert_eq!(events1.len(), 1);
1197        assert_eq!(
1198            events1[0],
1199            rnode_kiss::RNodeEvent::DataFrame {
1200                index: 1,
1201                data: payload1
1202            }
1203        );
1204    }
1205
1206    #[test]
1207    fn rnode_error_handling() {
1208        // NOTE: CMD_ERROR (0x90) == CMD_INT5_DATA (0x90) — they share the same
1209        // byte value. In multi-radio mode, 0x90 is treated as INT5_DATA.
1210        // Error events only appear on single-radio devices where there's no
1211        // INT5_DATA. The decoder treats 0x90 as INT5_DATA (data wins).
1212        // For real error detection, we rely on the data frame being invalid
1213        // or the device sending a different error indicator.
1214        // Test that cmd 0x90 is correctly handled as a data frame.
1215        let mut decoder = rnode_kiss::RNodeDecoder::new();
1216        let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
1217        let events = decoder.feed(&frame);
1218        assert_eq!(events.len(), 1);
1219        // 0x90 = CMD_INT5_DATA, so it's a DataFrame with index 5
1220        assert_eq!(
1221            events[0],
1222            rnode_kiss::RNodeEvent::DataFrame {
1223                index: 5,
1224                data: vec![0x02]
1225            }
1226        );
1227    }
1228
1229    #[test]
1230    fn rnode_config_validation() {
1231        let good = RNodeSubConfig {
1232            name: "test".into(),
1233            frequency: 868_000_000,
1234            bandwidth: 125_000,
1235            txpower: 7,
1236            spreading_factor: 8,
1237            coding_rate: 5,
1238            flow_control: false,
1239            st_alock: None,
1240            lt_alock: None,
1241        };
1242        assert!(validate_sub_config(&good).is_none());
1243
1244        // Bad frequency
1245        let mut bad = good.clone();
1246        bad.frequency = 100;
1247        assert!(validate_sub_config(&bad).is_some());
1248
1249        // Bad SF
1250        bad = good.clone();
1251        bad.spreading_factor = 13;
1252        assert!(validate_sub_config(&bad).is_some());
1253
1254        // Bad CR
1255        bad = good.clone();
1256        bad.coding_rate = 9;
1257        assert!(validate_sub_config(&bad).is_some());
1258
1259        // Bad BW
1260        bad = good.clone();
1261        bad.bandwidth = 5;
1262        assert!(validate_sub_config(&bad).is_some());
1263
1264        // Bad TX power
1265        bad = good.clone();
1266        bad.txpower = 50;
1267        assert!(validate_sub_config(&bad).is_some());
1268    }
1269
1270    #[test]
1271    fn rnode_reconnects_after_serial_disconnect() {
1272        let tempdir = tempdir().unwrap();
1273        let port_path = tempdir.path().join("rnode-port");
1274
1275        let (master1_fd, slave1_fd) = open_pty_pair().unwrap();
1276        let slave1_path = slave_tty_path(slave1_fd);
1277        std::os::unix::fs::symlink(&slave1_path, &port_path).unwrap();
1278
1279        let mut master1 = unsafe { std::fs::File::from_raw_fd(master1_fd) };
1280        let slave1 = unsafe { std::fs::File::from_raw_fd(slave1_fd) };
1281
1282        let (tx, rx) = event::channel();
1283        let sub = RNodeSubConfig {
1284            name: "test-rnode".into(),
1285            frequency: 868_000_000,
1286            bandwidth: 125_000,
1287            txpower: 7,
1288            spreading_factor: 8,
1289            coding_rate: 5,
1290            flow_control: false,
1291            st_alock: None,
1292            lt_alock: None,
1293        };
1294        let mut config = RNodeConfig {
1295            name: "test-rnode".into(),
1296            port: port_path.display().to_string(),
1297            speed: 115200,
1298            subinterfaces: vec![sub],
1299            id_interval: None,
1300            id_callsign: None,
1301            base_interface_id: InterfaceId(41),
1302            pre_opened_fd: None,
1303            runtime: Arc::new(Mutex::new(RNodeRuntime {
1304                sub: RNodeSubConfig {
1305                    name: String::new(),
1306                    frequency: 868_000_000,
1307                    bandwidth: 125_000,
1308                    txpower: 7,
1309                    spreading_factor: 8,
1310                    coding_rate: 5,
1311                    flow_control: false,
1312                    st_alock: None,
1313                    lt_alock: None,
1314                },
1315                writer: None,
1316            })),
1317        };
1318        config.runtime = Arc::new(Mutex::new(RNodeRuntime::from_config(&config)));
1319
1320        let _writers = start(config, tx).unwrap();
1321
1322        thread::sleep(Duration::from_secs(3));
1323        mock_respond_detect(&mut master1);
1324        let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1325            matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1326        });
1327        assert!(matches!(
1328            up,
1329            Event::InterfaceUp(InterfaceId(41), None, None)
1330        ));
1331
1332        drop(master1);
1333        drop(slave1);
1334
1335        let down = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1336            matches!(event, Event::InterfaceDown(InterfaceId(41)))
1337        });
1338        assert!(matches!(down, Event::InterfaceDown(InterfaceId(41))));
1339
1340        let (master2_fd, slave2_fd) = open_pty_pair().unwrap();
1341        let slave2_path = slave_tty_path(slave2_fd);
1342        std::fs::remove_file(&port_path).unwrap();
1343        std::os::unix::fs::symlink(&slave2_path, &port_path).unwrap();
1344
1345        let mut master2 = unsafe { std::fs::File::from_raw_fd(master2_fd) };
1346        let _slave2 = unsafe { std::fs::File::from_raw_fd(slave2_fd) };
1347
1348        thread::sleep(Duration::from_secs(3));
1349        mock_respond_detect(&mut master2);
1350        let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1351            matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1352        });
1353        assert!(matches!(
1354            up,
1355            Event::InterfaceUp(InterfaceId(41), Some(_), None)
1356        ));
1357    }
1358}