1mod transport;
10
11use std::io::{self, Read, Write};
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15use transport::Transport;
16
17use rns_core::transport::types::{AirtimeProfile, InterfaceId};
18
19use crate::event::{Event, EventSender};
20use crate::interface::{lock_or_recover, Writer};
21use crate::rnode_kiss;
22use crate::serial::{Parity, SerialConfig};
23
24pub const FREQ_MIN: u32 = 137_000_000;
26pub const FREQ_MAX: u32 = 3_000_000_000;
27pub const BW_MIN: u32 = 7_800;
28pub const BW_MAX: u32 = 1_625_000;
29pub const SF_MIN: u8 = 5;
30pub const SF_MAX: u8 = 12;
31pub const CR_MIN: u8 = 5;
32pub const CR_MAX: u8 = 8;
33pub const TXPOWER_MIN: i8 = 0;
34pub const TXPOWER_MAX: i8 = 37;
35pub const HW_MTU: u32 = 508;
36pub const LORA_PREAMBLE_SYMBOLS: u16 = 8;
37pub const LORA_EXPLICIT_HEADER: bool = true;
38pub const LORA_CRC: bool = true;
39
40#[derive(Debug, Clone)]
42pub struct RNodeSubConfig {
43 pub name: String,
44 pub frequency: u32,
45 pub bandwidth: u32,
46 pub txpower: i8,
47 pub spreading_factor: u8,
48 pub coding_rate: u8,
49 pub flow_control: bool,
50 pub st_alock: Option<f32>,
51 pub lt_alock: Option<f32>,
52}
53
54#[derive(Debug, Clone)]
56pub struct RNodeConfig {
57 pub name: String,
58 pub port: String,
59 pub speed: u32,
60 pub subinterfaces: Vec<RNodeSubConfig>,
61 pub id_interval: Option<u32>,
62 pub id_callsign: Option<Vec<u8>>,
63 pub base_interface_id: InterfaceId,
64 pub pre_opened_fd: Option<i32>,
67 pub runtime: Arc<Mutex<RNodeRuntime>>,
68}
69
70#[derive(Debug, Clone)]
71pub struct RNodeRuntime {
72 pub sub: RNodeSubConfig,
73 pub writer: Option<Arc<Mutex<Transport>>>,
74}
75
76impl RNodeRuntime {
77 pub fn from_config(config: &RNodeConfig) -> Self {
78 Self {
79 sub: config
80 .subinterfaces
81 .first()
82 .cloned()
83 .unwrap_or_else(|| RNodeSubConfig {
84 name: config.name.clone(),
85 frequency: 868_000_000,
86 bandwidth: 125_000,
87 txpower: 7,
88 spreading_factor: 8,
89 coding_rate: 5,
90 flow_control: false,
91 st_alock: None,
92 lt_alock: None,
93 }),
94 writer: None,
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
100pub struct RNodeRuntimeConfigHandle {
101 pub interface_name: String,
102 pub runtime: Arc<Mutex<RNodeRuntime>>,
103 pub startup: RNodeRuntime,
104}
105
106impl Default for RNodeConfig {
107 fn default() -> Self {
108 let mut config = RNodeConfig {
109 name: String::new(),
110 port: String::new(),
111 speed: 115200,
112 subinterfaces: Vec::new(),
113 id_interval: None,
114 id_callsign: None,
115 base_interface_id: InterfaceId(0),
116 pre_opened_fd: None,
117 runtime: Arc::new(Mutex::new(RNodeRuntime {
118 sub: RNodeSubConfig {
119 name: String::new(),
120 frequency: 868_000_000,
121 bandwidth: 125_000,
122 txpower: 7,
123 spreading_factor: 8,
124 coding_rate: 5,
125 flow_control: false,
126 st_alock: None,
127 lt_alock: None,
128 },
129 writer: None,
130 })),
131 };
132 let startup = RNodeRuntime::from_config(&config);
133 config.runtime = Arc::new(Mutex::new(startup));
134 config
135 }
136}
137
138pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
140 if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
141 return Some(format!(
142 "Invalid frequency {} for {}",
143 sub.frequency, sub.name
144 ));
145 }
146 if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
147 return Some(format!(
148 "Invalid bandwidth {} for {}",
149 sub.bandwidth, sub.name
150 ));
151 }
152 if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
153 return Some(format!(
154 "Invalid SF {} for {}",
155 sub.spreading_factor, sub.name
156 ));
157 }
158 if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
159 return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
160 }
161 if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
162 return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
163 }
164 if let Some(st) = sub.st_alock {
165 if st < 0.0 || st > 100.0 {
166 return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
167 }
168 }
169 if let Some(lt) = sub.lt_alock {
170 if lt < 0.0 || lt > 100.0 {
171 return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
172 }
173 }
174 None
175}
176
177pub fn estimate_lora_bitrate_bps(sub: &RNodeSubConfig) -> u64 {
181 let symbols_per_second = sub.bandwidth as f64 / (1u64 << sub.spreading_factor) as f64;
182 let bits_per_symbol = sub.spreading_factor as f64 * (4.0 / sub.coding_rate as f64);
183 (symbols_per_second * bits_per_symbol).round().max(1.0) as u64
184}
185
186pub fn lora_airtime_profile(sub: &RNodeSubConfig) -> AirtimeProfile {
187 AirtimeProfile::Lora {
188 bandwidth: sub.bandwidth,
189 spreading_factor: sub.spreading_factor,
190 coding_rate: sub.coding_rate,
191 preamble_symbols: LORA_PREAMBLE_SYMBOLS,
192 explicit_header: LORA_EXPLICIT_HEADER,
193 crc: LORA_CRC,
194 }
195}
196
197struct RNodeSubWriter {
200 writer: Arc<Mutex<Transport>>,
201 index: u8,
202 flow_control: bool,
203 flow_state: Arc<Mutex<SubFlowState>>,
204}
205
206struct SubFlowState {
207 ready: bool,
208 queue: std::collections::VecDeque<Vec<u8>>,
209}
210
211fn make_sub_writer(
212 writer: Arc<Mutex<Transport>>,
213 index: u8,
214 flow_control: bool,
215 flow_state: Arc<Mutex<SubFlowState>>,
216) -> Box<dyn Writer> {
217 Box::new(RNodeSubWriter {
218 writer,
219 index,
220 flow_control,
221 flow_state,
222 })
223}
224
225impl Writer for RNodeSubWriter {
226 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
227 let frame = rnode_kiss::rnode_data_frame(self.index, data);
228 if self.flow_control {
229 let mut state = lock_or_recover(&self.flow_state, "rnode flow state");
230 if state.ready {
231 state.ready = false;
232 drop(state);
233 lock_or_recover(&self.writer, "rnode shared writer").write_all(&frame)
234 } else {
235 state.queue.push_back(data.to_vec());
236 Ok(())
237 }
238 } else {
239 lock_or_recover(&self.writer, "rnode shared writer").write_all(&frame)
240 }
241 }
242}
243
244pub fn start(
251 config: RNodeConfig,
252 tx: EventSender,
253) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
254 for sub in &config.subinterfaces {
256 if let Some(err) = validate_sub_config(sub) {
257 return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
258 }
259 }
260
261 if config.subinterfaces.is_empty() {
262 return Err(io::Error::new(
263 io::ErrorKind::InvalidInput,
264 "No subinterfaces configured",
265 ));
266 }
267
268 let (reader_file, shared_writer) = if let Some(fd) = config.pre_opened_fd {
269 let (r, w) = Transport::open_from_fd(fd)?;
271 (r, Arc::new(Mutex::new(w)))
272 } else {
273 let serial_config = SerialConfig {
274 path: config.port.clone(),
275 baud: config.speed,
276 data_bits: 8,
277 parity: Parity::None,
278 stop_bits: 1,
279 };
280 let (r, w) = Transport::open(&serial_config)?;
281 (r, Arc::new(Mutex::new(w)))
282 };
283
284 let num_subs = config.subinterfaces.len();
286 let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
287 let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
288
289 for (i, sub) in config.subinterfaces.iter().enumerate() {
290 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
291 let flow_state = Arc::new(Mutex::new(SubFlowState {
292 ready: true,
293 queue: std::collections::VecDeque::new(),
294 }));
295 flow_states.push(flow_state.clone());
296 let sub_writer =
297 make_sub_writer(shared_writer.clone(), i as u8, sub.flow_control, flow_state);
298 writers.push((sub_id, sub_writer));
299 }
300
301 let reader_shared_writer = shared_writer.clone();
303 {
304 let mut runtime = lock_or_recover(&config.runtime, "rnode runtime");
305 runtime.writer = Some(shared_writer.clone());
306 runtime.sub = config
307 .subinterfaces
308 .first()
309 .cloned()
310 .unwrap_or(runtime.sub.clone());
311 }
312 let reader_config = config.clone();
313 let reader_flow_states = flow_states;
314 thread::Builder::new()
315 .name(format!("rnode-reader-{}", config.base_interface_id.0))
316 .spawn(move || {
317 reader_loop(
318 reader_file,
319 reader_shared_writer,
320 reader_config,
321 tx,
322 reader_flow_states,
323 );
324 })?;
325
326 let keepalive_writer = shared_writer.clone();
329 let keepalive_name = config.name.clone();
330 thread::Builder::new()
331 .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
332 .spawn(move || {
333 let detect = rnode_kiss::detect_request();
334 loop {
335 thread::sleep(Duration::from_secs(15));
336 if let Err(e) =
337 lock_or_recover(&keepalive_writer, "rnode shared writer").write_all(&detect)
338 {
339 log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
340 }
341 }
342 })?;
343
344 Ok(writers)
345}
346
347fn reader_loop(
349 mut reader: Transport,
350 writer: Arc<Mutex<Transport>>,
351 config: RNodeConfig,
352 tx: EventSender,
353 flow_states: Vec<Arc<Mutex<SubFlowState>>>,
354) {
355 const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(200);
356 const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(2);
357 thread::sleep(Duration::from_secs(2));
359 let mut connected_once = false;
360 let mut last_rssi: Option<i16> = None;
361 let mut last_snr: Option<f32> = None;
362 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
363 log::error!("[{}] initial RNode setup failed: {}", config.name, e);
364 return;
365 }
366 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
367 connected_once = true;
368 loop {
369 let mut decoder = rnode_kiss::RNodeDecoder::new();
370 let mut buf = [0u8; 4096];
371 let disconnected = loop {
372 match reader.read(&mut buf) {
373 Ok(0) => {
374 log::warn!("[{}] serial port closed", config.name);
375 signal_interface_down(&tx, &config);
376 break true;
377 }
378 Ok(n) => {
379 for event in decoder.feed(&buf[..n]) {
380 match event {
381 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
382 let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
383 if tx
384 .send(Event::Frame {
385 interface_id: sub_id,
386 data,
387 rssi: last_rssi,
388 snr: last_snr,
389 })
390 .is_err()
391 {
392 return;
393 }
394 last_rssi = None;
395 last_snr = None;
396 }
397 rnode_kiss::RNodeEvent::Ready => {
398 for (i, fs) in flow_states.iter().enumerate() {
400 if config.subinterfaces[i].flow_control {
401 process_flow_queue(fs, &writer, i as u8);
402 }
403 }
404 }
405 rnode_kiss::RNodeEvent::StatRssi(rssi) => {
406 last_rssi = Some(rssi as i16 - 157);
407 }
408 rnode_kiss::RNodeEvent::StatSnr(snr) => {
409 last_snr = Some(snr as f32 * 0.25);
410 }
411 rnode_kiss::RNodeEvent::Error(code) => {
412 log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
413 }
414 _ => {
415 }
417 }
418 }
419 }
420 Err(e) => {
421 log::error!("[{}] serial read error: {}", config.name, e);
422 signal_interface_down(&tx, &config);
423 break true;
424 }
425 }
426 };
427
428 clear_pending_rx_metadata(&mut last_rssi, &mut last_snr);
429
430 if !disconnected || config.pre_opened_fd.is_some() {
431 return;
432 }
433
434 let mut backoff = RECONNECT_INITIAL_DELAY;
435 loop {
436 match reopen_connection(&config, &writer) {
437 Ok(new_reader) => {
438 reset_flow_states(&flow_states);
439 reader = new_reader;
440 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
441 log::warn!("[{}] reconnect configure failed: {}", config.name, e);
442 thread::sleep(backoff);
443 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
444 continue;
445 }
446 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
447 break;
448 }
449 Err(e) => {
450 log::warn!("[{}] reconnect open failed: {}", config.name, e);
451 thread::sleep(backoff);
452 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
453 }
454 }
455 }
456 }
457}
458
459fn detect_and_configure(
460 reader: &mut Transport,
461 writer: &Arc<Mutex<Transport>>,
462 config: &RNodeConfig,
463) -> io::Result<()> {
464 let detect_cmd = rnode_kiss::detect_request();
465 let mut cmd = detect_cmd;
466 cmd.extend_from_slice(&rnode_kiss::rnode_command(
467 rnode_kiss::CMD_FW_VERSION,
468 &[0x00],
469 ));
470 cmd.extend_from_slice(&rnode_kiss::rnode_command(
471 rnode_kiss::CMD_FW_DETAIL,
472 &[0x00],
473 ));
474 cmd.extend_from_slice(&rnode_kiss::rnode_command(
475 rnode_kiss::CMD_PLATFORM,
476 &[0x00],
477 ));
478 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
479
480 lock_or_recover(writer, "rnode shared writer").write_all(&cmd)?;
481
482 let mut decoder = rnode_kiss::RNodeDecoder::new();
483 let mut buf = [0u8; 4096];
484 let mut detected = false;
485 let detect_start = std::time::Instant::now();
486 let detect_timeout = Duration::from_secs(5);
487
488 while !detected && detect_start.elapsed() < detect_timeout {
489 match reader.read(&mut buf) {
490 Ok(0) => {
491 return Err(io::Error::new(
492 io::ErrorKind::UnexpectedEof,
493 "serial port closed during detect",
494 ));
495 }
496 Ok(n) => {
497 for event in decoder.feed(&buf[..n]) {
498 match event {
499 rnode_kiss::RNodeEvent::Detected(true) => {
500 detected = true;
501 log::info!("[{}] RNode device detected", config.name);
502 }
503 rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
504 log::info!("[{}] firmware version {}.{}", config.name, major, minor);
505 }
506 rnode_kiss::RNodeEvent::FirmwareDetail(ref detail) => {
507 log::info!("[{}] firmware detail: {}", config.name, detail);
508 }
509 rnode_kiss::RNodeEvent::Platform(p) => {
510 log::info!("[{}] platform: 0x{:02X}", config.name, p);
511 }
512 rnode_kiss::RNodeEvent::Mcu(m) => {
513 log::info!("[{}] MCU: 0x{:02X}", config.name, m);
514 }
515 _ => {}
516 }
517 }
518 }
519 Err(e) => {
520 return Err(io::Error::new(
521 e.kind(),
522 format!("serial read error during detect: {}", e),
523 ));
524 }
525 }
526 }
527
528 if !detected {
529 return Err(io::Error::new(
530 io::ErrorKind::TimedOut,
531 "RNode detection timed out",
532 ));
533 }
534
535 for (i, sub) in config.subinterfaces.iter().enumerate() {
536 configure_subinterface(writer, i as u8, sub, config.subinterfaces.len() > 1)?;
537 }
538
539 thread::sleep(Duration::from_millis(300));
540 log::info!(
541 "[{}] RNode configured with {} subinterface(s)",
542 config.name,
543 config.subinterfaces.len()
544 );
545 Ok(())
546}
547
548fn signal_interface_down(tx: &EventSender, config: &RNodeConfig) {
549 for i in 0..config.subinterfaces.len() {
550 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
551 let _ = tx.send(Event::InterfaceDown(sub_id));
552 }
553}
554
555fn signal_interface_up(
556 tx: &EventSender,
557 config: &RNodeConfig,
558 writer: &Arc<Mutex<Transport>>,
559 flow_states: &[Arc<Mutex<SubFlowState>>],
560 reconnected: bool,
561) {
562 for (i, flow_state) in flow_states.iter().enumerate() {
563 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
564 let new_writer = reconnected.then(|| {
565 make_sub_writer(
566 writer.clone(),
567 i as u8,
568 config.subinterfaces[i].flow_control,
569 flow_state.clone(),
570 )
571 });
572 let _ = tx.send(Event::InterfaceUp(sub_id, new_writer, None));
573 }
574}
575
576fn reset_flow_states(flow_states: &[Arc<Mutex<SubFlowState>>]) {
577 for flow_state in flow_states {
578 let mut state = lock_or_recover(flow_state, "rnode flow state");
579 state.ready = true;
580 state.queue.clear();
581 }
582}
583
584fn clear_pending_rx_metadata(last_rssi: &mut Option<i16>, last_snr: &mut Option<f32>) {
585 *last_rssi = None;
586 *last_snr = None;
587}
588
589fn reopen_connection(
590 config: &RNodeConfig,
591 writer: &Arc<Mutex<Transport>>,
592) -> io::Result<Transport> {
593 let serial_config = SerialConfig {
594 path: config.port.clone(),
595 baud: config.speed,
596 data_bits: 8,
597 parity: Parity::None,
598 stop_bits: 1,
599 };
600
601 let (reader, new_writer) = Transport::open(&serial_config)?;
602 *lock_or_recover(writer, "rnode shared writer") = new_writer;
603 Ok(reader)
604}
605
606pub(crate) fn configure_subinterface(
608 writer: &Arc<Mutex<Transport>>,
609 index: u8,
610 sub: &RNodeSubConfig,
611 multi: bool,
612) -> io::Result<()> {
613 let mut w = lock_or_recover(writer, "rnode shared writer");
614
615 let freq_bytes = [
617 (sub.frequency >> 24) as u8,
618 (sub.frequency >> 16) as u8,
619 (sub.frequency >> 8) as u8,
620 (sub.frequency & 0xFF) as u8,
621 ];
622 let bw_bytes = [
623 (sub.bandwidth >> 24) as u8,
624 (sub.bandwidth >> 16) as u8,
625 (sub.bandwidth >> 8) as u8,
626 (sub.bandwidth & 0xFF) as u8,
627 ];
628 let txp = if sub.txpower < 0 {
629 (sub.txpower as i16 + 256) as u8
630 } else {
631 sub.txpower as u8
632 };
633
634 if multi {
635 w.write_all(&rnode_kiss::rnode_select_command(
636 index,
637 rnode_kiss::CMD_FREQUENCY,
638 &freq_bytes,
639 ))?;
640 w.write_all(&rnode_kiss::rnode_select_command(
641 index,
642 rnode_kiss::CMD_BANDWIDTH,
643 &bw_bytes,
644 ))?;
645 w.write_all(&rnode_kiss::rnode_select_command(
646 index,
647 rnode_kiss::CMD_TXPOWER,
648 &[txp],
649 ))?;
650 w.write_all(&rnode_kiss::rnode_select_command(
651 index,
652 rnode_kiss::CMD_SF,
653 &[sub.spreading_factor],
654 ))?;
655 w.write_all(&rnode_kiss::rnode_select_command(
656 index,
657 rnode_kiss::CMD_CR,
658 &[sub.coding_rate],
659 ))?;
660 } else {
661 w.write_all(&rnode_kiss::rnode_command(
662 rnode_kiss::CMD_FREQUENCY,
663 &freq_bytes,
664 ))?;
665 w.write_all(&rnode_kiss::rnode_command(
666 rnode_kiss::CMD_BANDWIDTH,
667 &bw_bytes,
668 ))?;
669 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
670 w.write_all(&rnode_kiss::rnode_command(
671 rnode_kiss::CMD_SF,
672 &[sub.spreading_factor],
673 ))?;
674 w.write_all(&rnode_kiss::rnode_command(
675 rnode_kiss::CMD_CR,
676 &[sub.coding_rate],
677 ))?;
678 }
679
680 if let Some(st) = sub.st_alock {
682 let st_val = (st * 100.0) as u16;
683 let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
684 if multi {
685 w.write_all(&rnode_kiss::rnode_select_command(
686 index,
687 rnode_kiss::CMD_ST_ALOCK,
688 &st_bytes,
689 ))?;
690 } else {
691 w.write_all(&rnode_kiss::rnode_command(
692 rnode_kiss::CMD_ST_ALOCK,
693 &st_bytes,
694 ))?;
695 }
696 }
697 if let Some(lt) = sub.lt_alock {
698 let lt_val = (lt * 100.0) as u16;
699 let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
700 if multi {
701 w.write_all(&rnode_kiss::rnode_select_command(
702 index,
703 rnode_kiss::CMD_LT_ALOCK,
704 <_bytes,
705 ))?;
706 } else {
707 w.write_all(&rnode_kiss::rnode_command(
708 rnode_kiss::CMD_LT_ALOCK,
709 <_bytes,
710 ))?;
711 }
712 }
713
714 if multi {
716 w.write_all(&rnode_kiss::rnode_select_command(
717 index,
718 rnode_kiss::CMD_RADIO_STATE,
719 &[rnode_kiss::RADIO_STATE_ON],
720 ))?;
721 } else {
722 w.write_all(&rnode_kiss::rnode_command(
723 rnode_kiss::CMD_RADIO_STATE,
724 &[rnode_kiss::RADIO_STATE_ON],
725 ))?;
726 }
727
728 Ok(())
729}
730
731fn process_flow_queue(
733 flow_state: &Arc<Mutex<SubFlowState>>,
734 writer: &Arc<Mutex<Transport>>,
735 index: u8,
736) {
737 let mut state = lock_or_recover(flow_state, "rnode flow state");
738 if let Some(data) = state.queue.pop_front() {
739 state.ready = false;
740 drop(state);
741 let frame = rnode_kiss::rnode_data_frame(index, &data);
742 let _ = lock_or_recover(writer, "rnode shared writer").write_all(&frame);
743 } else {
744 state.ready = true;
745 }
746}
747
748use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, SubInterface};
751use rns_core::transport::types::InterfaceInfo;
752use std::collections::HashMap;
753
754pub struct RNodeFactory;
756
757impl InterfaceFactory for RNodeFactory {
758 fn type_name(&self) -> &str {
759 "RNodeInterface"
760 }
761
762 fn default_ifac_size(&self) -> usize {
763 8
764 }
765
766 fn parse_config(
767 &self,
768 name: &str,
769 id: InterfaceId,
770 params: &HashMap<String, String>,
771 ) -> Result<Box<dyn InterfaceConfigData>, String> {
772 let pre_opened_fd = params.get("fd").and_then(|v| v.parse::<i32>().ok());
773
774 let port = params
775 .get("port")
776 .cloned()
777 .or_else(|| pre_opened_fd.map(|_| "usb-bridge".to_string()))
778 .ok_or_else(|| "RNodeInterface requires 'port' or 'fd'".to_string())?;
779
780 let speed = params
781 .get("speed")
782 .and_then(|v| v.parse().ok())
783 .unwrap_or(115200u32);
784
785 let frequency = params
786 .get("frequency")
787 .and_then(|v| v.parse().ok())
788 .unwrap_or(868_000_000u32);
789
790 let bandwidth = params
791 .get("bandwidth")
792 .and_then(|v| v.parse().ok())
793 .unwrap_or(125_000u32);
794
795 let txpower = params
796 .get("txpower")
797 .and_then(|v| v.parse().ok())
798 .unwrap_or(7i8);
799
800 let spreading_factor = params
801 .get("spreadingfactor")
802 .or_else(|| params.get("spreading_factor"))
803 .and_then(|v| v.parse().ok())
804 .unwrap_or(8u8);
805
806 let coding_rate = params
807 .get("codingrate")
808 .or_else(|| params.get("coding_rate"))
809 .and_then(|v| v.parse().ok())
810 .unwrap_or(5u8);
811
812 let flow_control = params
813 .get("flow_control")
814 .and_then(|v| crate::config::parse_bool_pub(v))
815 .unwrap_or(false);
816
817 let st_alock = params.get("st_alock").and_then(|v| v.parse().ok());
818
819 let lt_alock = params.get("lt_alock").and_then(|v| v.parse().ok());
820
821 let id_interval = params.get("id_interval").and_then(|v| v.parse().ok());
822
823 let id_callsign = params.get("id_callsign").map(|v| v.as_bytes().to_vec());
824
825 let sub = RNodeSubConfig {
826 name: name.to_string(),
827 frequency,
828 bandwidth,
829 txpower,
830 spreading_factor,
831 coding_rate,
832 flow_control,
833 st_alock,
834 lt_alock,
835 };
836
837 Ok(Box::new(RNodeConfig {
838 name: name.to_string(),
839 port,
840 speed,
841 subinterfaces: vec![sub],
842 id_interval,
843 id_callsign,
844 base_interface_id: id,
845 pre_opened_fd,
846 runtime: Arc::new(Mutex::new(RNodeRuntime {
847 sub: RNodeSubConfig {
848 name: name.to_string(),
849 frequency,
850 bandwidth,
851 txpower,
852 spreading_factor,
853 coding_rate,
854 flow_control,
855 st_alock,
856 lt_alock,
857 },
858 writer: None,
859 })),
860 }))
861 }
862
863 fn start(
864 &self,
865 config: Box<dyn InterfaceConfigData>,
866 ctx: StartContext,
867 ) -> std::io::Result<StartResult> {
868 let rnode_config = *config.into_any().downcast::<RNodeConfig>().map_err(|_| {
869 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
870 })?;
871
872 let name = rnode_config.name.clone();
873 let sub_bitrates: Vec<u64> = rnode_config
874 .subinterfaces
875 .iter()
876 .map(estimate_lora_bitrate_bps)
877 .collect();
878 let airtime_profiles: Vec<AirtimeProfile> = rnode_config
879 .subinterfaces
880 .iter()
881 .map(lora_airtime_profile)
882 .collect();
883
884 let pairs = start(rnode_config, ctx.tx)?;
885
886 let mut subs = Vec::with_capacity(pairs.len());
887 for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
888 let sub_name = if index == 0 {
889 name.clone()
890 } else {
891 format!("{}/{}", name, index)
892 };
893
894 let info = InterfaceInfo {
895 id: sub_id,
896 name: sub_name,
897 mode: ctx.mode,
898 out_capable: true,
899 in_capable: true,
900 bitrate: sub_bitrates.get(index).copied(),
901 airtime_profile: airtime_profiles.get(index).copied(),
902 announce_rate_target: None,
903 announce_rate_grace: 0,
904 announce_rate_penalty: 0.0,
905 announce_cap: rns_core::constants::ANNOUNCE_CAP,
906 is_local_client: false,
907 wants_tunnel: false,
908 tunnel_id: None,
909 mtu: rns_core::constants::MTU as u32,
910 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
911 ia_freq: 0.0,
912 ip_freq: 0.0,
913 op_freq: 0.0,
914 op_samples: 0,
915 started: crate::time::now(),
916 };
917
918 subs.push(SubInterface {
919 id: sub_id,
920 info,
921 writer,
922 interface_type_name: "RNodeInterface".to_string(),
923 });
924 }
925
926 Ok(StartResult::Multi(subs))
927 }
928}
929
930pub(crate) fn rnode_runtime_handle_from_config(config: &RNodeConfig) -> RNodeRuntimeConfigHandle {
931 RNodeRuntimeConfigHandle {
932 interface_name: config.name.clone(),
933 runtime: Arc::clone(&config.runtime),
934 startup: RNodeRuntime::from_config(config),
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use super::*;
941 use crate::event;
942 use crate::kiss;
943 use crate::serial::open_pty_pair;
944 use std::os::unix::io::{AsRawFd, FromRawFd};
945 use std::path::PathBuf;
946 use std::sync::mpsc::RecvTimeoutError;
947 use tempfile::tempdir;
948 fn poll_read(fd: i32, timeout_ms: i32) -> bool {
950 let mut pfd = libc::pollfd {
951 fd,
952 events: libc::POLLIN,
953 revents: 0,
954 };
955 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
956 ret > 0
957 }
958
959 fn read_available(file: &mut Transport) -> Vec<u8> {
961 let mut all = Vec::new();
962 let mut buf = [0u8; 4096];
963 while poll_read(file.as_raw_fd(), 100) {
964 match file.read(&mut buf) {
965 Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
966 _ => break,
967 }
968 }
969 all
970 }
971
972 fn slave_tty_path(fd: i32) -> PathBuf {
973 let mut buf = [0u8; 256];
974 let rc = unsafe { libc::ttyname_r(fd, buf.as_mut_ptr().cast(), buf.len()) };
975 assert_eq!(rc, 0, "ttyname_r failed for fd {}", fd);
976 let nul = buf.iter().position(|b| *b == 0).unwrap_or(buf.len());
977 PathBuf::from(std::str::from_utf8(&buf[..nul]).unwrap())
978 }
979
980 unsafe fn transport_from_raw_fd(fd: i32) -> Transport {
981 Transport::Serial(std::fs::File::from_raw_fd(fd))
982 }
983
984 fn wait_for_interface_event<F>(
985 rx: &std::sync::mpsc::Receiver<Event>,
986 timeout: Duration,
987 predicate: F,
988 ) -> Event
989 where
990 F: Fn(&Event) -> bool,
991 {
992 let deadline = std::time::Instant::now() + timeout;
993 loop {
994 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
995 match rx.recv_timeout(remaining) {
996 Ok(event) if predicate(&event) => return event,
997 Ok(_) => continue,
998 Err(RecvTimeoutError::Timeout) => panic!("timed out waiting for interface event"),
999 Err(RecvTimeoutError::Disconnected) => panic!("event channel disconnected"),
1000 }
1001 }
1002 }
1003
1004 fn mock_respond_detect(master: &mut Transport) {
1006 master
1008 .write_all(&rnode_kiss::rnode_command(
1009 rnode_kiss::CMD_DETECT,
1010 &[rnode_kiss::DETECT_RESP],
1011 ))
1012 .unwrap();
1013 master
1015 .write_all(&rnode_kiss::rnode_command(
1016 rnode_kiss::CMD_FW_VERSION,
1017 &[0x01, 0x4A],
1018 ))
1019 .unwrap();
1020 master
1022 .write_all(&rnode_kiss::rnode_command(
1023 rnode_kiss::CMD_PLATFORM,
1024 &[0x80],
1025 ))
1026 .unwrap();
1027 master
1029 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
1030 .unwrap();
1031 master.flush().unwrap();
1032 }
1033
1034 #[test]
1035 fn rnode_detect_over_pty() {
1036 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1038 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1039 let mut slave = unsafe { transport_from_raw_fd(slave_fd) };
1040
1041 mock_respond_detect(&mut master);
1043
1044 let mut decoder = rnode_kiss::RNodeDecoder::new();
1045 let mut events = Vec::new();
1046 let deadline = std::time::Instant::now() + Duration::from_secs(2);
1047
1048 while std::time::Instant::now() < deadline {
1049 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1050 let timeout_ms = remaining.as_millis().min(i32::MAX as u128) as i32;
1051 if !poll_read(slave.as_raw_fd(), timeout_ms) {
1052 break;
1053 }
1054
1055 let chunk = read_available(&mut slave);
1056 if chunk.is_empty() {
1057 continue;
1058 }
1059
1060 events.extend(decoder.feed(&chunk));
1061
1062 let saw_detect = events
1063 .iter()
1064 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true)));
1065 let saw_firmware = events
1066 .iter()
1067 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. }));
1068 if saw_detect && saw_firmware {
1069 break;
1070 }
1071 }
1072
1073 assert!(
1074 events
1075 .iter()
1076 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
1077 "should detect device"
1078 );
1079 assert!(
1080 events
1081 .iter()
1082 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
1083 "should get firmware version"
1084 );
1085 }
1086
1087 #[test]
1088 fn rnode_configure_commands() {
1089 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1090 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1091 let writer_file = unsafe { transport_from_raw_fd(libc::dup(slave_fd)) };
1092 let writer = Arc::new(Mutex::new(writer_file));
1093
1094 let sub = RNodeSubConfig {
1095 name: "test".into(),
1096 frequency: 868_000_000,
1097 bandwidth: 125_000,
1098 txpower: 7,
1099 spreading_factor: 8,
1100 coding_rate: 5,
1101 flow_control: false,
1102 st_alock: None,
1103 lt_alock: None,
1104 };
1105
1106 configure_subinterface(&writer, 0, &sub, false).unwrap();
1107
1108 let data = read_available(&mut master);
1110
1111 assert!(
1113 data.windows(2)
1114 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
1115 "should contain FREQUENCY command"
1116 );
1117 assert!(
1119 data.windows(2)
1120 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
1121 "should contain BANDWIDTH command"
1122 );
1123 assert!(
1125 data.windows(3).any(|w| w[0] == kiss::FEND
1126 && w[1] == rnode_kiss::CMD_RADIO_STATE
1127 && w[2] == rnode_kiss::RADIO_STATE_ON),
1128 "should contain RADIO_STATE ON command"
1129 );
1130
1131 unsafe { libc::close(slave_fd) };
1132 }
1133
1134 #[test]
1135 fn rnode_data_roundtrip() {
1136 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1137 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1138 let slave = unsafe { transport_from_raw_fd(slave_fd) };
1139
1140 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
1142 let frame = rnode_kiss::rnode_data_frame(0, &payload);
1143 master.write_all(&frame).unwrap();
1144 master.flush().unwrap();
1145
1146 assert!(poll_read(slave.as_raw_fd(), 2000));
1148 let mut decoder = rnode_kiss::RNodeDecoder::new();
1149 let mut buf = [0u8; 4096];
1150 let mut slave_file = slave;
1151 let n = slave_file.read(&mut buf).unwrap();
1152 let events = decoder.feed(&buf[..n]);
1153
1154 assert_eq!(events.len(), 1);
1155 match &events[0] {
1156 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
1157 assert_eq!(*index, 0);
1158 assert_eq!(data, &payload);
1159 }
1160 other => panic!("expected DataFrame, got {:?}", other),
1161 }
1162 }
1163
1164 #[test]
1165 fn rnode_flow_control() {
1166 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1167 let writer_file = unsafe { transport_from_raw_fd(slave_fd) };
1168 let shared_writer = Arc::new(Mutex::new(writer_file));
1169
1170 let flow_state = Arc::new(Mutex::new(SubFlowState {
1171 ready: true,
1172 queue: std::collections::VecDeque::new(),
1173 }));
1174
1175 let mut sub_writer = RNodeSubWriter {
1176 writer: shared_writer.clone(),
1177 index: 0,
1178 flow_control: true,
1179 flow_state: flow_state.clone(),
1180 };
1181
1182 sub_writer.send_frame(b"hello").unwrap();
1184 assert!(!flow_state.lock().unwrap().ready);
1185
1186 sub_writer.send_frame(b"world").unwrap();
1188 assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
1189
1190 process_flow_queue(&flow_state, &shared_writer, 0);
1192 assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
1193 assert!(!flow_state.lock().unwrap().ready); process_flow_queue(&flow_state, &shared_writer, 0);
1197 assert!(flow_state.lock().unwrap().ready);
1198
1199 unsafe { libc::close(master_fd) };
1200 }
1201
1202 #[test]
1203 fn rnode_reset_clears_pending_rx_metadata() {
1204 let mut last_rssi = Some(-101);
1205 let mut last_snr = Some(7.25);
1206
1207 clear_pending_rx_metadata(&mut last_rssi, &mut last_snr);
1208
1209 assert_eq!(last_rssi, None);
1210 assert_eq!(last_snr, None);
1211 }
1212
1213 #[test]
1214 fn rnode_sub_writer_format() {
1215 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1216 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1217 let writer_file = unsafe { transport_from_raw_fd(slave_fd) };
1218 let shared_writer = Arc::new(Mutex::new(writer_file));
1219
1220 let flow_state = Arc::new(Mutex::new(SubFlowState {
1221 ready: true,
1222 queue: std::collections::VecDeque::new(),
1223 }));
1224
1225 let mut sub_writer = RNodeSubWriter {
1226 writer: shared_writer,
1227 index: 1, flow_control: false,
1229 flow_state,
1230 };
1231
1232 let payload = vec![0xAA, 0xBB, 0xCC];
1233 sub_writer.send_frame(&payload).unwrap();
1234
1235 assert!(poll_read(master.as_raw_fd(), 2000));
1237 let mut buf = [0u8; 256];
1238 let n = master.read(&mut buf).unwrap();
1239
1240 assert_eq!(buf[0], kiss::FEND);
1242 assert_eq!(buf[1], 0x10); assert_eq!(buf[n - 1], kiss::FEND);
1244 }
1245
1246 #[test]
1247 fn rnode_multi_sub_routing() {
1248 let mut decoder = rnode_kiss::RNodeDecoder::new();
1250
1251 let payload0 = vec![0x01, 0x02];
1252 let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
1253 let events0 = decoder.feed(&frame0);
1254 assert_eq!(events0.len(), 1);
1255 assert_eq!(
1256 events0[0],
1257 rnode_kiss::RNodeEvent::DataFrame {
1258 index: 0,
1259 data: payload0
1260 }
1261 );
1262
1263 let payload1 = vec![0x03, 0x04];
1264 let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
1265 let events1 = decoder.feed(&frame1);
1266 assert_eq!(events1.len(), 1);
1267 assert_eq!(
1268 events1[0],
1269 rnode_kiss::RNodeEvent::DataFrame {
1270 index: 1,
1271 data: payload1
1272 }
1273 );
1274 }
1275
1276 #[test]
1277 fn rnode_error_handling() {
1278 let mut decoder = rnode_kiss::RNodeDecoder::new();
1286 let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
1287 let events = decoder.feed(&frame);
1288 assert_eq!(events.len(), 1);
1289 assert_eq!(
1291 events[0],
1292 rnode_kiss::RNodeEvent::DataFrame {
1293 index: 5,
1294 data: vec![0x02]
1295 }
1296 );
1297 }
1298
1299 #[test]
1300 fn rnode_config_validation() {
1301 let good = RNodeSubConfig {
1302 name: "test".into(),
1303 frequency: 868_000_000,
1304 bandwidth: 125_000,
1305 txpower: 7,
1306 spreading_factor: 8,
1307 coding_rate: 5,
1308 flow_control: false,
1309 st_alock: None,
1310 lt_alock: None,
1311 };
1312 assert!(validate_sub_config(&good).is_none());
1313
1314 let mut bad = good.clone();
1316 bad.frequency = 100;
1317 assert!(validate_sub_config(&bad).is_some());
1318
1319 bad = good.clone();
1321 bad.spreading_factor = 13;
1322 assert!(validate_sub_config(&bad).is_some());
1323
1324 bad = good.clone();
1326 bad.coding_rate = 9;
1327 assert!(validate_sub_config(&bad).is_some());
1328
1329 bad = good.clone();
1331 bad.bandwidth = 5;
1332 assert!(validate_sub_config(&bad).is_some());
1333
1334 bad = good.clone();
1336 bad.txpower = 50;
1337 assert!(validate_sub_config(&bad).is_some());
1338 }
1339
1340 #[test]
1341 fn rnode_lora_bitrate_estimate_uses_radio_params() {
1342 let mut sub = RNodeSubConfig {
1343 name: "test".into(),
1344 frequency: 868_000_000,
1345 bandwidth: 125_000,
1346 txpower: 7,
1347 spreading_factor: 8,
1348 coding_rate: 5,
1349 flow_control: false,
1350 st_alock: None,
1351 lt_alock: None,
1352 };
1353
1354 assert_eq!(estimate_lora_bitrate_bps(&sub), 3125);
1355
1356 sub.spreading_factor = 12;
1357 assert_eq!(estimate_lora_bitrate_bps(&sub), 293);
1358
1359 sub.bandwidth = 250_000;
1360 assert_eq!(estimate_lora_bitrate_bps(&sub), 586);
1361 }
1362
1363 #[test]
1364 fn rnode_lora_airtime_profile_uses_radio_params() {
1365 let sub = RNodeSubConfig {
1366 name: "test".into(),
1367 frequency: 868_000_000,
1368 bandwidth: 125_000,
1369 txpower: 7,
1370 spreading_factor: 8,
1371 coding_rate: 5,
1372 flow_control: false,
1373 st_alock: None,
1374 lt_alock: None,
1375 };
1376
1377 let profile = lora_airtime_profile(&sub);
1378 assert!((profile.transmit_time_secs(100) - 0.307712).abs() < 0.000001);
1379 assert_eq!(
1380 profile,
1381 AirtimeProfile::Lora {
1382 bandwidth: 125_000,
1383 spreading_factor: 8,
1384 coding_rate: 5,
1385 preamble_symbols: LORA_PREAMBLE_SYMBOLS,
1386 explicit_header: LORA_EXPLICIT_HEADER,
1387 crc: LORA_CRC,
1388 }
1389 );
1390 }
1391
1392 #[test]
1393 fn rnode_reconnects_after_serial_disconnect() {
1394 let tempdir = tempdir().unwrap();
1395 let port_path = tempdir.path().join("rnode-port");
1396
1397 let (master1_fd, slave1_fd) = open_pty_pair().unwrap();
1398 let slave1_path = slave_tty_path(slave1_fd);
1399 std::os::unix::fs::symlink(&slave1_path, &port_path).unwrap();
1400
1401 let mut master1 = unsafe { transport_from_raw_fd(master1_fd) };
1402 let slave1 = unsafe { transport_from_raw_fd(slave1_fd) };
1403
1404 let (tx, rx) = event::channel();
1405 let sub = RNodeSubConfig {
1406 name: "test-rnode".into(),
1407 frequency: 868_000_000,
1408 bandwidth: 125_000,
1409 txpower: 7,
1410 spreading_factor: 8,
1411 coding_rate: 5,
1412 flow_control: false,
1413 st_alock: None,
1414 lt_alock: None,
1415 };
1416 let mut config = RNodeConfig {
1417 name: "test-rnode".into(),
1418 port: port_path.display().to_string(),
1419 speed: 115200,
1420 subinterfaces: vec![sub],
1421 id_interval: None,
1422 id_callsign: None,
1423 base_interface_id: InterfaceId(41),
1424 pre_opened_fd: None,
1425 runtime: Arc::new(Mutex::new(RNodeRuntime {
1426 sub: RNodeSubConfig {
1427 name: String::new(),
1428 frequency: 868_000_000,
1429 bandwidth: 125_000,
1430 txpower: 7,
1431 spreading_factor: 8,
1432 coding_rate: 5,
1433 flow_control: false,
1434 st_alock: None,
1435 lt_alock: None,
1436 },
1437 writer: None,
1438 })),
1439 };
1440 config.runtime = Arc::new(Mutex::new(RNodeRuntime::from_config(&config)));
1441
1442 let _writers = start(config, tx).unwrap();
1443
1444 thread::sleep(Duration::from_secs(3));
1445 mock_respond_detect(&mut master1);
1446 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1447 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1448 });
1449 assert!(matches!(
1450 up,
1451 Event::InterfaceUp(InterfaceId(41), None, None)
1452 ));
1453
1454 drop(master1);
1455 drop(slave1);
1456
1457 let down = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1458 matches!(event, Event::InterfaceDown(InterfaceId(41)))
1459 });
1460 assert!(matches!(down, Event::InterfaceDown(InterfaceId(41))));
1461
1462 let (master2_fd, slave2_fd) = open_pty_pair().unwrap();
1463 let slave2_path = slave_tty_path(slave2_fd);
1464 std::fs::remove_file(&port_path).unwrap();
1465 std::os::unix::fs::symlink(&slave2_path, &port_path).unwrap();
1466
1467 let mut master2 = unsafe { transport_from_raw_fd(master2_fd) };
1468 let _slave2 = unsafe { transport_from_raw_fd(slave2_fd) };
1469
1470 thread::sleep(Duration::from_secs(3));
1471 mock_respond_detect(&mut master2);
1472 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1473 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1474 });
1475 assert!(matches!(
1476 up,
1477 Event::InterfaceUp(InterfaceId(41), Some(_), None)
1478 ));
1479 }
1480}