1mod transport;
10
11use std::io::{self, Read, Write};
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15use transport::Transport;
16
17use rns_core::transport::types::{AirtimeProfile, InterfaceId};
18
19use crate::event::{Event, EventSender};
20use crate::interface::{lock_or_recover, Writer};
21use crate::rnode_kiss;
22use crate::serial::{Parity, SerialConfig};
23
24pub const FREQ_MIN: u32 = 137_000_000;
26pub const FREQ_MAX: u32 = 3_000_000_000;
27pub const BW_MIN: u32 = 7_800;
28pub const BW_MAX: u32 = 1_625_000;
29pub const SF_MIN: u8 = 5;
30pub const SF_MAX: u8 = 12;
31pub const CR_MIN: u8 = 5;
32pub const CR_MAX: u8 = 8;
33pub const TXPOWER_MIN: i8 = 0;
34pub const TXPOWER_MAX: i8 = 37;
35pub const HW_MTU: u32 = 508;
36pub const LORA_PREAMBLE_SYMBOLS: u16 = 8;
37pub const LORA_EXPLICIT_HEADER: bool = true;
38pub const LORA_CRC: bool = true;
39
40#[derive(Debug, Clone)]
42pub struct RNodeSubConfig {
43 pub name: String,
44 pub frequency: u32,
45 pub bandwidth: u32,
46 pub txpower: i8,
47 pub spreading_factor: u8,
48 pub coding_rate: u8,
49 pub flow_control: bool,
50 pub st_alock: Option<f32>,
51 pub lt_alock: Option<f32>,
52}
53
54#[derive(Debug, Clone)]
56pub struct RNodeConfig {
57 pub name: String,
58 pub port: String,
59 pub speed: u32,
60 pub subinterfaces: Vec<RNodeSubConfig>,
61 pub id_interval: Option<u32>,
62 pub id_callsign: Option<Vec<u8>>,
63 pub base_interface_id: InterfaceId,
64 pub pre_opened_fd: Option<i32>,
67 pub runtime: Arc<Mutex<RNodeRuntime>>,
68}
69
70#[derive(Debug, Clone)]
71pub struct RNodeRuntime {
72 pub sub: RNodeSubConfig,
73 pub writer: Option<Arc<Mutex<Transport>>>,
74}
75
76impl RNodeRuntime {
77 pub fn from_config(config: &RNodeConfig) -> Self {
78 Self {
79 sub: config
80 .subinterfaces
81 .first()
82 .cloned()
83 .unwrap_or_else(|| RNodeSubConfig {
84 name: config.name.clone(),
85 frequency: 868_000_000,
86 bandwidth: 125_000,
87 txpower: 7,
88 spreading_factor: 8,
89 coding_rate: 5,
90 flow_control: false,
91 st_alock: None,
92 lt_alock: None,
93 }),
94 writer: None,
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
100pub struct RNodeRuntimeConfigHandle {
101 pub interface_name: String,
102 pub runtime: Arc<Mutex<RNodeRuntime>>,
103 pub startup: RNodeRuntime,
104}
105
106impl Default for RNodeConfig {
107 fn default() -> Self {
108 let mut config = RNodeConfig {
109 name: String::new(),
110 port: String::new(),
111 speed: 115200,
112 subinterfaces: Vec::new(),
113 id_interval: None,
114 id_callsign: None,
115 base_interface_id: InterfaceId(0),
116 pre_opened_fd: None,
117 runtime: Arc::new(Mutex::new(RNodeRuntime {
118 sub: RNodeSubConfig {
119 name: String::new(),
120 frequency: 868_000_000,
121 bandwidth: 125_000,
122 txpower: 7,
123 spreading_factor: 8,
124 coding_rate: 5,
125 flow_control: false,
126 st_alock: None,
127 lt_alock: None,
128 },
129 writer: None,
130 })),
131 };
132 let startup = RNodeRuntime::from_config(&config);
133 config.runtime = Arc::new(Mutex::new(startup));
134 config
135 }
136}
137
138pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
140 if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
141 return Some(format!(
142 "Invalid frequency {} for {}",
143 sub.frequency, sub.name
144 ));
145 }
146 if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
147 return Some(format!(
148 "Invalid bandwidth {} for {}",
149 sub.bandwidth, sub.name
150 ));
151 }
152 if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
153 return Some(format!(
154 "Invalid SF {} for {}",
155 sub.spreading_factor, sub.name
156 ));
157 }
158 if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
159 return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
160 }
161 if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
162 return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
163 }
164 if let Some(st) = sub.st_alock {
165 if st < 0.0 || st > 100.0 {
166 return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
167 }
168 }
169 if let Some(lt) = sub.lt_alock {
170 if lt < 0.0 || lt > 100.0 {
171 return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
172 }
173 }
174 None
175}
176
177pub fn estimate_lora_bitrate_bps(sub: &RNodeSubConfig) -> u64 {
181 let symbols_per_second = sub.bandwidth as f64 / (1u64 << sub.spreading_factor) as f64;
182 let bits_per_symbol = sub.spreading_factor as f64 * (4.0 / sub.coding_rate as f64);
183 (symbols_per_second * bits_per_symbol).round().max(1.0) as u64
184}
185
186pub fn lora_airtime_profile(sub: &RNodeSubConfig) -> AirtimeProfile {
187 AirtimeProfile::Lora {
188 bandwidth: sub.bandwidth,
189 spreading_factor: sub.spreading_factor,
190 coding_rate: sub.coding_rate,
191 preamble_symbols: LORA_PREAMBLE_SYMBOLS,
192 explicit_header: LORA_EXPLICIT_HEADER,
193 crc: LORA_CRC,
194 }
195}
196
197struct RNodeSubWriter {
200 writer: Arc<Mutex<Transport>>,
201 index: u8,
202 flow_control: bool,
203 flow_state: Arc<Mutex<SubFlowState>>,
204}
205
206struct SubFlowState {
207 ready: bool,
208 queue: std::collections::VecDeque<Vec<u8>>,
209}
210
211fn make_sub_writer(
212 writer: Arc<Mutex<Transport>>,
213 index: u8,
214 flow_control: bool,
215 flow_state: Arc<Mutex<SubFlowState>>,
216) -> Box<dyn Writer> {
217 Box::new(RNodeSubWriter {
218 writer,
219 index,
220 flow_control,
221 flow_state,
222 })
223}
224
225impl Writer for RNodeSubWriter {
226 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
227 let frame = rnode_kiss::rnode_data_frame(self.index, data);
228 if self.flow_control {
229 let mut state = lock_or_recover(&self.flow_state, "rnode flow state");
230 if state.ready {
231 state.ready = false;
232 drop(state);
233 lock_or_recover(&self.writer, "rnode shared writer").write_all(&frame)
234 } else {
235 state.queue.push_back(data.to_vec());
236 Ok(())
237 }
238 } else {
239 lock_or_recover(&self.writer, "rnode shared writer").write_all(&frame)
240 }
241 }
242}
243
244pub fn start(
251 config: RNodeConfig,
252 tx: EventSender,
253) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
254 for sub in &config.subinterfaces {
256 if let Some(err) = validate_sub_config(sub) {
257 return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
258 }
259 }
260
261 if config.subinterfaces.is_empty() {
262 return Err(io::Error::new(
263 io::ErrorKind::InvalidInput,
264 "No subinterfaces configured",
265 ));
266 }
267
268 let (reader_file, shared_writer) = if let Some(fd) = config.pre_opened_fd {
269 let (r, w) = Transport::open_from_fd(fd)?;
271 (r, Arc::new(Mutex::new(w)))
272 } else {
273 let serial_config = SerialConfig {
274 path: config.port.clone(),
275 baud: config.speed,
276 data_bits: 8,
277 parity: Parity::None,
278 stop_bits: 1,
279 };
280 let (r, w) = Transport::open(&serial_config)?;
281 (r, Arc::new(Mutex::new(w)))
282 };
283
284 let num_subs = config.subinterfaces.len();
286 let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
287 let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
288
289 for (i, sub) in config.subinterfaces.iter().enumerate() {
290 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
291 let flow_state = Arc::new(Mutex::new(SubFlowState {
292 ready: true,
293 queue: std::collections::VecDeque::new(),
294 }));
295 flow_states.push(flow_state.clone());
296 let sub_writer =
297 make_sub_writer(shared_writer.clone(), i as u8, sub.flow_control, flow_state);
298 writers.push((sub_id, sub_writer));
299 }
300
301 let reader_shared_writer = shared_writer.clone();
303 {
304 let mut runtime = lock_or_recover(&config.runtime, "rnode runtime");
305 runtime.writer = Some(shared_writer.clone());
306 runtime.sub = config
307 .subinterfaces
308 .first()
309 .cloned()
310 .unwrap_or(runtime.sub.clone());
311 }
312 let reader_config = config.clone();
313 let reader_flow_states = flow_states;
314 thread::Builder::new()
315 .name(format!("rnode-reader-{}", config.base_interface_id.0))
316 .spawn(move || {
317 reader_loop(
318 reader_file,
319 reader_shared_writer,
320 reader_config,
321 tx,
322 reader_flow_states,
323 );
324 })?;
325
326 let keepalive_writer = shared_writer.clone();
329 let keepalive_name = config.name.clone();
330 thread::Builder::new()
331 .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
332 .spawn(move || {
333 let detect = rnode_kiss::detect_request();
334 loop {
335 thread::sleep(Duration::from_secs(15));
336 if let Err(e) =
337 lock_or_recover(&keepalive_writer, "rnode shared writer").write_all(&detect)
338 {
339 log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
340 }
341 }
342 })?;
343
344 Ok(writers)
345}
346
347fn reader_loop(
349 mut reader: Transport,
350 writer: Arc<Mutex<Transport>>,
351 config: RNodeConfig,
352 tx: EventSender,
353 flow_states: Vec<Arc<Mutex<SubFlowState>>>,
354) {
355 const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(200);
356 const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(2);
357 thread::sleep(Duration::from_secs(2));
359 let mut connected_once = false;
360 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
361 log::error!("[{}] initial RNode setup failed: {}", config.name, e);
362 return;
363 }
364 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
365 connected_once = true;
366 loop {
367 let mut decoder = rnode_kiss::RNodeDecoder::new();
368 let mut buf = [0u8; 4096];
369 let disconnected = loop {
370 match reader.read(&mut buf) {
371 Ok(0) => {
372 log::warn!("[{}] serial port closed", config.name);
373 signal_interface_down(&tx, &config);
374 break true;
375 }
376 Ok(n) => {
377 for event in decoder.feed(&buf[..n]) {
378 match event {
379 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
380 let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
381 if tx
382 .send(Event::Frame {
383 interface_id: sub_id,
384 data,
385 })
386 .is_err()
387 {
388 return;
389 }
390 }
391 rnode_kiss::RNodeEvent::Ready => {
392 for (i, fs) in flow_states.iter().enumerate() {
394 if config.subinterfaces[i].flow_control {
395 process_flow_queue(fs, &writer, i as u8);
396 }
397 }
398 }
399 rnode_kiss::RNodeEvent::Error(code) => {
400 log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
401 }
402 _ => {
403 }
405 }
406 }
407 }
408 Err(e) => {
409 log::error!("[{}] serial read error: {}", config.name, e);
410 signal_interface_down(&tx, &config);
411 break true;
412 }
413 }
414 };
415
416 if !disconnected || config.pre_opened_fd.is_some() {
417 return;
418 }
419
420 let mut backoff = RECONNECT_INITIAL_DELAY;
421 loop {
422 match reopen_connection(&config, &writer) {
423 Ok(new_reader) => {
424 reset_flow_states(&flow_states);
425 reader = new_reader;
426 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
427 log::warn!("[{}] reconnect configure failed: {}", config.name, e);
428 thread::sleep(backoff);
429 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
430 continue;
431 }
432 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
433 break;
434 }
435 Err(e) => {
436 log::warn!("[{}] reconnect open failed: {}", config.name, e);
437 thread::sleep(backoff);
438 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
439 }
440 }
441 }
442 }
443}
444
445fn detect_and_configure(
446 reader: &mut Transport,
447 writer: &Arc<Mutex<Transport>>,
448 config: &RNodeConfig,
449) -> io::Result<()> {
450 let detect_cmd = rnode_kiss::detect_request();
451 let mut cmd = detect_cmd;
452 cmd.extend_from_slice(&rnode_kiss::rnode_command(
453 rnode_kiss::CMD_FW_VERSION,
454 &[0x00],
455 ));
456 cmd.extend_from_slice(&rnode_kiss::rnode_command(
457 rnode_kiss::CMD_FW_DETAIL,
458 &[0x00],
459 ));
460 cmd.extend_from_slice(&rnode_kiss::rnode_command(
461 rnode_kiss::CMD_PLATFORM,
462 &[0x00],
463 ));
464 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
465
466 lock_or_recover(writer, "rnode shared writer").write_all(&cmd)?;
467
468 let mut decoder = rnode_kiss::RNodeDecoder::new();
469 let mut buf = [0u8; 4096];
470 let mut detected = false;
471 let detect_start = std::time::Instant::now();
472 let detect_timeout = Duration::from_secs(5);
473
474 while !detected && detect_start.elapsed() < detect_timeout {
475 match reader.read(&mut buf) {
476 Ok(0) => {
477 return Err(io::Error::new(
478 io::ErrorKind::UnexpectedEof,
479 "serial port closed during detect",
480 ));
481 }
482 Ok(n) => {
483 for event in decoder.feed(&buf[..n]) {
484 match event {
485 rnode_kiss::RNodeEvent::Detected(true) => {
486 detected = true;
487 log::info!("[{}] RNode device detected", config.name);
488 }
489 rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
490 log::info!("[{}] firmware version {}.{}", config.name, major, minor);
491 }
492 rnode_kiss::RNodeEvent::FirmwareDetail(ref detail) => {
493 log::info!("[{}] firmware detail: {}", config.name, detail);
494 }
495 rnode_kiss::RNodeEvent::Platform(p) => {
496 log::info!("[{}] platform: 0x{:02X}", config.name, p);
497 }
498 rnode_kiss::RNodeEvent::Mcu(m) => {
499 log::info!("[{}] MCU: 0x{:02X}", config.name, m);
500 }
501 _ => {}
502 }
503 }
504 }
505 Err(e) => {
506 return Err(io::Error::new(
507 e.kind(),
508 format!("serial read error during detect: {}", e),
509 ));
510 }
511 }
512 }
513
514 if !detected {
515 return Err(io::Error::new(
516 io::ErrorKind::TimedOut,
517 "RNode detection timed out",
518 ));
519 }
520
521 for (i, sub) in config.subinterfaces.iter().enumerate() {
522 configure_subinterface(writer, i as u8, sub, config.subinterfaces.len() > 1)?;
523 }
524
525 thread::sleep(Duration::from_millis(300));
526 log::info!(
527 "[{}] RNode configured with {} subinterface(s)",
528 config.name,
529 config.subinterfaces.len()
530 );
531 Ok(())
532}
533
534fn signal_interface_down(tx: &EventSender, config: &RNodeConfig) {
535 for i in 0..config.subinterfaces.len() {
536 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
537 let _ = tx.send(Event::InterfaceDown(sub_id));
538 }
539}
540
541fn signal_interface_up(
542 tx: &EventSender,
543 config: &RNodeConfig,
544 writer: &Arc<Mutex<Transport>>,
545 flow_states: &[Arc<Mutex<SubFlowState>>],
546 reconnected: bool,
547) {
548 for (i, flow_state) in flow_states.iter().enumerate() {
549 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
550 let new_writer = reconnected.then(|| {
551 make_sub_writer(
552 writer.clone(),
553 i as u8,
554 config.subinterfaces[i].flow_control,
555 flow_state.clone(),
556 )
557 });
558 let _ = tx.send(Event::InterfaceUp(sub_id, new_writer, None));
559 }
560}
561
562fn reset_flow_states(flow_states: &[Arc<Mutex<SubFlowState>>]) {
563 for flow_state in flow_states {
564 let mut state = lock_or_recover(flow_state, "rnode flow state");
565 state.ready = true;
566 state.queue.clear();
567 }
568}
569
570fn reopen_connection(
571 config: &RNodeConfig,
572 writer: &Arc<Mutex<Transport>>,
573) -> io::Result<Transport> {
574 let serial_config = SerialConfig {
575 path: config.port.clone(),
576 baud: config.speed,
577 data_bits: 8,
578 parity: Parity::None,
579 stop_bits: 1,
580 };
581
582 let (reader, new_writer) = Transport::open(&serial_config)?;
583 *lock_or_recover(writer, "rnode shared writer") = new_writer;
584 Ok(reader)
585}
586
587pub(crate) fn configure_subinterface(
589 writer: &Arc<Mutex<Transport>>,
590 index: u8,
591 sub: &RNodeSubConfig,
592 multi: bool,
593) -> io::Result<()> {
594 let mut w = lock_or_recover(writer, "rnode shared writer");
595
596 let freq_bytes = [
598 (sub.frequency >> 24) as u8,
599 (sub.frequency >> 16) as u8,
600 (sub.frequency >> 8) as u8,
601 (sub.frequency & 0xFF) as u8,
602 ];
603 let bw_bytes = [
604 (sub.bandwidth >> 24) as u8,
605 (sub.bandwidth >> 16) as u8,
606 (sub.bandwidth >> 8) as u8,
607 (sub.bandwidth & 0xFF) as u8,
608 ];
609 let txp = if sub.txpower < 0 {
610 (sub.txpower as i16 + 256) as u8
611 } else {
612 sub.txpower as u8
613 };
614
615 if multi {
616 w.write_all(&rnode_kiss::rnode_select_command(
617 index,
618 rnode_kiss::CMD_FREQUENCY,
619 &freq_bytes,
620 ))?;
621 w.write_all(&rnode_kiss::rnode_select_command(
622 index,
623 rnode_kiss::CMD_BANDWIDTH,
624 &bw_bytes,
625 ))?;
626 w.write_all(&rnode_kiss::rnode_select_command(
627 index,
628 rnode_kiss::CMD_TXPOWER,
629 &[txp],
630 ))?;
631 w.write_all(&rnode_kiss::rnode_select_command(
632 index,
633 rnode_kiss::CMD_SF,
634 &[sub.spreading_factor],
635 ))?;
636 w.write_all(&rnode_kiss::rnode_select_command(
637 index,
638 rnode_kiss::CMD_CR,
639 &[sub.coding_rate],
640 ))?;
641 } else {
642 w.write_all(&rnode_kiss::rnode_command(
643 rnode_kiss::CMD_FREQUENCY,
644 &freq_bytes,
645 ))?;
646 w.write_all(&rnode_kiss::rnode_command(
647 rnode_kiss::CMD_BANDWIDTH,
648 &bw_bytes,
649 ))?;
650 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
651 w.write_all(&rnode_kiss::rnode_command(
652 rnode_kiss::CMD_SF,
653 &[sub.spreading_factor],
654 ))?;
655 w.write_all(&rnode_kiss::rnode_command(
656 rnode_kiss::CMD_CR,
657 &[sub.coding_rate],
658 ))?;
659 }
660
661 if let Some(st) = sub.st_alock {
663 let st_val = (st * 100.0) as u16;
664 let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
665 if multi {
666 w.write_all(&rnode_kiss::rnode_select_command(
667 index,
668 rnode_kiss::CMD_ST_ALOCK,
669 &st_bytes,
670 ))?;
671 } else {
672 w.write_all(&rnode_kiss::rnode_command(
673 rnode_kiss::CMD_ST_ALOCK,
674 &st_bytes,
675 ))?;
676 }
677 }
678 if let Some(lt) = sub.lt_alock {
679 let lt_val = (lt * 100.0) as u16;
680 let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
681 if multi {
682 w.write_all(&rnode_kiss::rnode_select_command(
683 index,
684 rnode_kiss::CMD_LT_ALOCK,
685 <_bytes,
686 ))?;
687 } else {
688 w.write_all(&rnode_kiss::rnode_command(
689 rnode_kiss::CMD_LT_ALOCK,
690 <_bytes,
691 ))?;
692 }
693 }
694
695 if multi {
697 w.write_all(&rnode_kiss::rnode_select_command(
698 index,
699 rnode_kiss::CMD_RADIO_STATE,
700 &[rnode_kiss::RADIO_STATE_ON],
701 ))?;
702 } else {
703 w.write_all(&rnode_kiss::rnode_command(
704 rnode_kiss::CMD_RADIO_STATE,
705 &[rnode_kiss::RADIO_STATE_ON],
706 ))?;
707 }
708
709 Ok(())
710}
711
712fn process_flow_queue(
714 flow_state: &Arc<Mutex<SubFlowState>>,
715 writer: &Arc<Mutex<Transport>>,
716 index: u8,
717) {
718 let mut state = lock_or_recover(flow_state, "rnode flow state");
719 if let Some(data) = state.queue.pop_front() {
720 state.ready = false;
721 drop(state);
722 let frame = rnode_kiss::rnode_data_frame(index, &data);
723 let _ = lock_or_recover(writer, "rnode shared writer").write_all(&frame);
724 } else {
725 state.ready = true;
726 }
727}
728
729use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, SubInterface};
732use rns_core::transport::types::InterfaceInfo;
733use std::collections::HashMap;
734
735pub struct RNodeFactory;
737
738impl InterfaceFactory for RNodeFactory {
739 fn type_name(&self) -> &str {
740 "RNodeInterface"
741 }
742
743 fn default_ifac_size(&self) -> usize {
744 8
745 }
746
747 fn parse_config(
748 &self,
749 name: &str,
750 id: InterfaceId,
751 params: &HashMap<String, String>,
752 ) -> Result<Box<dyn InterfaceConfigData>, String> {
753 let pre_opened_fd = params.get("fd").and_then(|v| v.parse::<i32>().ok());
754
755 let port = params
756 .get("port")
757 .cloned()
758 .or_else(|| pre_opened_fd.map(|_| "usb-bridge".to_string()))
759 .ok_or_else(|| "RNodeInterface requires 'port' or 'fd'".to_string())?;
760
761 let speed = params
762 .get("speed")
763 .and_then(|v| v.parse().ok())
764 .unwrap_or(115200u32);
765
766 let frequency = params
767 .get("frequency")
768 .and_then(|v| v.parse().ok())
769 .unwrap_or(868_000_000u32);
770
771 let bandwidth = params
772 .get("bandwidth")
773 .and_then(|v| v.parse().ok())
774 .unwrap_or(125_000u32);
775
776 let txpower = params
777 .get("txpower")
778 .and_then(|v| v.parse().ok())
779 .unwrap_or(7i8);
780
781 let spreading_factor = params
782 .get("spreadingfactor")
783 .or_else(|| params.get("spreading_factor"))
784 .and_then(|v| v.parse().ok())
785 .unwrap_or(8u8);
786
787 let coding_rate = params
788 .get("codingrate")
789 .or_else(|| params.get("coding_rate"))
790 .and_then(|v| v.parse().ok())
791 .unwrap_or(5u8);
792
793 let flow_control = params
794 .get("flow_control")
795 .and_then(|v| crate::config::parse_bool_pub(v))
796 .unwrap_or(false);
797
798 let st_alock = params.get("st_alock").and_then(|v| v.parse().ok());
799
800 let lt_alock = params.get("lt_alock").and_then(|v| v.parse().ok());
801
802 let id_interval = params.get("id_interval").and_then(|v| v.parse().ok());
803
804 let id_callsign = params.get("id_callsign").map(|v| v.as_bytes().to_vec());
805
806 let sub = RNodeSubConfig {
807 name: name.to_string(),
808 frequency,
809 bandwidth,
810 txpower,
811 spreading_factor,
812 coding_rate,
813 flow_control,
814 st_alock,
815 lt_alock,
816 };
817
818 Ok(Box::new(RNodeConfig {
819 name: name.to_string(),
820 port,
821 speed,
822 subinterfaces: vec![sub],
823 id_interval,
824 id_callsign,
825 base_interface_id: id,
826 pre_opened_fd,
827 runtime: Arc::new(Mutex::new(RNodeRuntime {
828 sub: RNodeSubConfig {
829 name: name.to_string(),
830 frequency,
831 bandwidth,
832 txpower,
833 spreading_factor,
834 coding_rate,
835 flow_control,
836 st_alock,
837 lt_alock,
838 },
839 writer: None,
840 })),
841 }))
842 }
843
844 fn start(
845 &self,
846 config: Box<dyn InterfaceConfigData>,
847 ctx: StartContext,
848 ) -> std::io::Result<StartResult> {
849 let rnode_config = *config.into_any().downcast::<RNodeConfig>().map_err(|_| {
850 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
851 })?;
852
853 let name = rnode_config.name.clone();
854 let sub_bitrates: Vec<u64> = rnode_config
855 .subinterfaces
856 .iter()
857 .map(estimate_lora_bitrate_bps)
858 .collect();
859 let airtime_profiles: Vec<AirtimeProfile> = rnode_config
860 .subinterfaces
861 .iter()
862 .map(lora_airtime_profile)
863 .collect();
864
865 let pairs = start(rnode_config, ctx.tx)?;
866
867 let mut subs = Vec::with_capacity(pairs.len());
868 for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
869 let sub_name = if index == 0 {
870 name.clone()
871 } else {
872 format!("{}/{}", name, index)
873 };
874
875 let info = InterfaceInfo {
876 id: sub_id,
877 name: sub_name,
878 mode: ctx.mode,
879 out_capable: true,
880 in_capable: true,
881 bitrate: sub_bitrates.get(index).copied(),
882 airtime_profile: airtime_profiles.get(index).copied(),
883 announce_rate_target: None,
884 announce_rate_grace: 0,
885 announce_rate_penalty: 0.0,
886 announce_cap: rns_core::constants::ANNOUNCE_CAP,
887 is_local_client: false,
888 wants_tunnel: false,
889 tunnel_id: None,
890 mtu: rns_core::constants::MTU as u32,
891 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
892 ia_freq: 0.0,
893 started: crate::time::now(),
894 };
895
896 subs.push(SubInterface {
897 id: sub_id,
898 info,
899 writer,
900 interface_type_name: "RNodeInterface".to_string(),
901 });
902 }
903
904 Ok(StartResult::Multi(subs))
905 }
906}
907
908pub(crate) fn rnode_runtime_handle_from_config(config: &RNodeConfig) -> RNodeRuntimeConfigHandle {
909 RNodeRuntimeConfigHandle {
910 interface_name: config.name.clone(),
911 runtime: Arc::clone(&config.runtime),
912 startup: RNodeRuntime::from_config(config),
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::*;
919 use crate::event;
920 use crate::kiss;
921 use crate::serial::open_pty_pair;
922 use std::os::unix::io::{AsRawFd, FromRawFd};
923 use std::path::PathBuf;
924 use std::sync::mpsc::RecvTimeoutError;
925 use tempfile::tempdir;
926 fn poll_read(fd: i32, timeout_ms: i32) -> bool {
928 let mut pfd = libc::pollfd {
929 fd,
930 events: libc::POLLIN,
931 revents: 0,
932 };
933 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
934 ret > 0
935 }
936
937 fn read_available(file: &mut Transport) -> Vec<u8> {
939 let mut all = Vec::new();
940 let mut buf = [0u8; 4096];
941 while poll_read(file.as_raw_fd(), 100) {
942 match file.read(&mut buf) {
943 Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
944 _ => break,
945 }
946 }
947 all
948 }
949
950 fn slave_tty_path(fd: i32) -> PathBuf {
951 let mut buf = [0u8; 256];
952 let rc = unsafe { libc::ttyname_r(fd, buf.as_mut_ptr().cast(), buf.len()) };
953 assert_eq!(rc, 0, "ttyname_r failed for fd {}", fd);
954 let nul = buf.iter().position(|b| *b == 0).unwrap_or(buf.len());
955 PathBuf::from(std::str::from_utf8(&buf[..nul]).unwrap())
956 }
957
958 unsafe fn transport_from_raw_fd(fd: i32) -> Transport {
959 Transport::Serial(std::fs::File::from_raw_fd(fd))
960 }
961
962 fn wait_for_interface_event<F>(
963 rx: &std::sync::mpsc::Receiver<Event>,
964 timeout: Duration,
965 predicate: F,
966 ) -> Event
967 where
968 F: Fn(&Event) -> bool,
969 {
970 let deadline = std::time::Instant::now() + timeout;
971 loop {
972 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
973 match rx.recv_timeout(remaining) {
974 Ok(event) if predicate(&event) => return event,
975 Ok(_) => continue,
976 Err(RecvTimeoutError::Timeout) => panic!("timed out waiting for interface event"),
977 Err(RecvTimeoutError::Disconnected) => panic!("event channel disconnected"),
978 }
979 }
980 }
981
982 fn mock_respond_detect(master: &mut Transport) {
984 master
986 .write_all(&rnode_kiss::rnode_command(
987 rnode_kiss::CMD_DETECT,
988 &[rnode_kiss::DETECT_RESP],
989 ))
990 .unwrap();
991 master
993 .write_all(&rnode_kiss::rnode_command(
994 rnode_kiss::CMD_FW_VERSION,
995 &[0x01, 0x4A],
996 ))
997 .unwrap();
998 master
1000 .write_all(&rnode_kiss::rnode_command(
1001 rnode_kiss::CMD_PLATFORM,
1002 &[0x80],
1003 ))
1004 .unwrap();
1005 master
1007 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
1008 .unwrap();
1009 master.flush().unwrap();
1010 }
1011
1012 #[test]
1013 fn rnode_detect_over_pty() {
1014 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1016 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1017 let mut slave = unsafe { transport_from_raw_fd(slave_fd) };
1018
1019 mock_respond_detect(&mut master);
1021
1022 let mut decoder = rnode_kiss::RNodeDecoder::new();
1023 let mut events = Vec::new();
1024 let deadline = std::time::Instant::now() + Duration::from_secs(2);
1025
1026 while std::time::Instant::now() < deadline {
1027 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1028 let timeout_ms = remaining.as_millis().min(i32::MAX as u128) as i32;
1029 if !poll_read(slave.as_raw_fd(), timeout_ms) {
1030 break;
1031 }
1032
1033 let chunk = read_available(&mut slave);
1034 if chunk.is_empty() {
1035 continue;
1036 }
1037
1038 events.extend(decoder.feed(&chunk));
1039
1040 let saw_detect = events
1041 .iter()
1042 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true)));
1043 let saw_firmware = events
1044 .iter()
1045 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. }));
1046 if saw_detect && saw_firmware {
1047 break;
1048 }
1049 }
1050
1051 assert!(
1052 events
1053 .iter()
1054 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
1055 "should detect device"
1056 );
1057 assert!(
1058 events
1059 .iter()
1060 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
1061 "should get firmware version"
1062 );
1063 }
1064
1065 #[test]
1066 fn rnode_configure_commands() {
1067 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1068 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1069 let writer_file = unsafe { transport_from_raw_fd(libc::dup(slave_fd)) };
1070 let writer = Arc::new(Mutex::new(writer_file));
1071
1072 let sub = RNodeSubConfig {
1073 name: "test".into(),
1074 frequency: 868_000_000,
1075 bandwidth: 125_000,
1076 txpower: 7,
1077 spreading_factor: 8,
1078 coding_rate: 5,
1079 flow_control: false,
1080 st_alock: None,
1081 lt_alock: None,
1082 };
1083
1084 configure_subinterface(&writer, 0, &sub, false).unwrap();
1085
1086 let data = read_available(&mut master);
1088
1089 assert!(
1091 data.windows(2)
1092 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
1093 "should contain FREQUENCY command"
1094 );
1095 assert!(
1097 data.windows(2)
1098 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
1099 "should contain BANDWIDTH command"
1100 );
1101 assert!(
1103 data.windows(3).any(|w| w[0] == kiss::FEND
1104 && w[1] == rnode_kiss::CMD_RADIO_STATE
1105 && w[2] == rnode_kiss::RADIO_STATE_ON),
1106 "should contain RADIO_STATE ON command"
1107 );
1108
1109 unsafe { libc::close(slave_fd) };
1110 }
1111
1112 #[test]
1113 fn rnode_data_roundtrip() {
1114 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1115 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1116 let slave = unsafe { transport_from_raw_fd(slave_fd) };
1117
1118 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
1120 let frame = rnode_kiss::rnode_data_frame(0, &payload);
1121 master.write_all(&frame).unwrap();
1122 master.flush().unwrap();
1123
1124 assert!(poll_read(slave.as_raw_fd(), 2000));
1126 let mut decoder = rnode_kiss::RNodeDecoder::new();
1127 let mut buf = [0u8; 4096];
1128 let mut slave_file = slave;
1129 let n = slave_file.read(&mut buf).unwrap();
1130 let events = decoder.feed(&buf[..n]);
1131
1132 assert_eq!(events.len(), 1);
1133 match &events[0] {
1134 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
1135 assert_eq!(*index, 0);
1136 assert_eq!(data, &payload);
1137 }
1138 other => panic!("expected DataFrame, got {:?}", other),
1139 }
1140 }
1141
1142 #[test]
1143 fn rnode_flow_control() {
1144 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1145 let writer_file = unsafe { transport_from_raw_fd(slave_fd) };
1146 let shared_writer = Arc::new(Mutex::new(writer_file));
1147
1148 let flow_state = Arc::new(Mutex::new(SubFlowState {
1149 ready: true,
1150 queue: std::collections::VecDeque::new(),
1151 }));
1152
1153 let mut sub_writer = RNodeSubWriter {
1154 writer: shared_writer.clone(),
1155 index: 0,
1156 flow_control: true,
1157 flow_state: flow_state.clone(),
1158 };
1159
1160 sub_writer.send_frame(b"hello").unwrap();
1162 assert!(!flow_state.lock().unwrap().ready);
1163
1164 sub_writer.send_frame(b"world").unwrap();
1166 assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
1167
1168 process_flow_queue(&flow_state, &shared_writer, 0);
1170 assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
1171 assert!(!flow_state.lock().unwrap().ready); process_flow_queue(&flow_state, &shared_writer, 0);
1175 assert!(flow_state.lock().unwrap().ready);
1176
1177 unsafe { libc::close(master_fd) };
1178 }
1179
1180 #[test]
1181 fn rnode_sub_writer_format() {
1182 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1183 let mut master = unsafe { transport_from_raw_fd(master_fd) };
1184 let writer_file = unsafe { transport_from_raw_fd(slave_fd) };
1185 let shared_writer = Arc::new(Mutex::new(writer_file));
1186
1187 let flow_state = Arc::new(Mutex::new(SubFlowState {
1188 ready: true,
1189 queue: std::collections::VecDeque::new(),
1190 }));
1191
1192 let mut sub_writer = RNodeSubWriter {
1193 writer: shared_writer,
1194 index: 1, flow_control: false,
1196 flow_state,
1197 };
1198
1199 let payload = vec![0xAA, 0xBB, 0xCC];
1200 sub_writer.send_frame(&payload).unwrap();
1201
1202 assert!(poll_read(master.as_raw_fd(), 2000));
1204 let mut buf = [0u8; 256];
1205 let n = master.read(&mut buf).unwrap();
1206
1207 assert_eq!(buf[0], kiss::FEND);
1209 assert_eq!(buf[1], 0x10); assert_eq!(buf[n - 1], kiss::FEND);
1211 }
1212
1213 #[test]
1214 fn rnode_multi_sub_routing() {
1215 let mut decoder = rnode_kiss::RNodeDecoder::new();
1217
1218 let payload0 = vec![0x01, 0x02];
1219 let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
1220 let events0 = decoder.feed(&frame0);
1221 assert_eq!(events0.len(), 1);
1222 assert_eq!(
1223 events0[0],
1224 rnode_kiss::RNodeEvent::DataFrame {
1225 index: 0,
1226 data: payload0
1227 }
1228 );
1229
1230 let payload1 = vec![0x03, 0x04];
1231 let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
1232 let events1 = decoder.feed(&frame1);
1233 assert_eq!(events1.len(), 1);
1234 assert_eq!(
1235 events1[0],
1236 rnode_kiss::RNodeEvent::DataFrame {
1237 index: 1,
1238 data: payload1
1239 }
1240 );
1241 }
1242
1243 #[test]
1244 fn rnode_error_handling() {
1245 let mut decoder = rnode_kiss::RNodeDecoder::new();
1253 let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
1254 let events = decoder.feed(&frame);
1255 assert_eq!(events.len(), 1);
1256 assert_eq!(
1258 events[0],
1259 rnode_kiss::RNodeEvent::DataFrame {
1260 index: 5,
1261 data: vec![0x02]
1262 }
1263 );
1264 }
1265
1266 #[test]
1267 fn rnode_config_validation() {
1268 let good = RNodeSubConfig {
1269 name: "test".into(),
1270 frequency: 868_000_000,
1271 bandwidth: 125_000,
1272 txpower: 7,
1273 spreading_factor: 8,
1274 coding_rate: 5,
1275 flow_control: false,
1276 st_alock: None,
1277 lt_alock: None,
1278 };
1279 assert!(validate_sub_config(&good).is_none());
1280
1281 let mut bad = good.clone();
1283 bad.frequency = 100;
1284 assert!(validate_sub_config(&bad).is_some());
1285
1286 bad = good.clone();
1288 bad.spreading_factor = 13;
1289 assert!(validate_sub_config(&bad).is_some());
1290
1291 bad = good.clone();
1293 bad.coding_rate = 9;
1294 assert!(validate_sub_config(&bad).is_some());
1295
1296 bad = good.clone();
1298 bad.bandwidth = 5;
1299 assert!(validate_sub_config(&bad).is_some());
1300
1301 bad = good.clone();
1303 bad.txpower = 50;
1304 assert!(validate_sub_config(&bad).is_some());
1305 }
1306
1307 #[test]
1308 fn rnode_lora_bitrate_estimate_uses_radio_params() {
1309 let mut sub = RNodeSubConfig {
1310 name: "test".into(),
1311 frequency: 868_000_000,
1312 bandwidth: 125_000,
1313 txpower: 7,
1314 spreading_factor: 8,
1315 coding_rate: 5,
1316 flow_control: false,
1317 st_alock: None,
1318 lt_alock: None,
1319 };
1320
1321 assert_eq!(estimate_lora_bitrate_bps(&sub), 3125);
1322
1323 sub.spreading_factor = 12;
1324 assert_eq!(estimate_lora_bitrate_bps(&sub), 293);
1325
1326 sub.bandwidth = 250_000;
1327 assert_eq!(estimate_lora_bitrate_bps(&sub), 586);
1328 }
1329
1330 #[test]
1331 fn rnode_lora_airtime_profile_uses_radio_params() {
1332 let sub = RNodeSubConfig {
1333 name: "test".into(),
1334 frequency: 868_000_000,
1335 bandwidth: 125_000,
1336 txpower: 7,
1337 spreading_factor: 8,
1338 coding_rate: 5,
1339 flow_control: false,
1340 st_alock: None,
1341 lt_alock: None,
1342 };
1343
1344 let profile = lora_airtime_profile(&sub);
1345 assert!((profile.transmit_time_secs(100) - 0.307712).abs() < 0.000001);
1346 assert_eq!(
1347 profile,
1348 AirtimeProfile::Lora {
1349 bandwidth: 125_000,
1350 spreading_factor: 8,
1351 coding_rate: 5,
1352 preamble_symbols: LORA_PREAMBLE_SYMBOLS,
1353 explicit_header: LORA_EXPLICIT_HEADER,
1354 crc: LORA_CRC,
1355 }
1356 );
1357 }
1358
1359 #[test]
1360 fn rnode_reconnects_after_serial_disconnect() {
1361 let tempdir = tempdir().unwrap();
1362 let port_path = tempdir.path().join("rnode-port");
1363
1364 let (master1_fd, slave1_fd) = open_pty_pair().unwrap();
1365 let slave1_path = slave_tty_path(slave1_fd);
1366 std::os::unix::fs::symlink(&slave1_path, &port_path).unwrap();
1367
1368 let mut master1 = unsafe { transport_from_raw_fd(master1_fd) };
1369 let slave1 = unsafe { transport_from_raw_fd(slave1_fd) };
1370
1371 let (tx, rx) = event::channel();
1372 let sub = RNodeSubConfig {
1373 name: "test-rnode".into(),
1374 frequency: 868_000_000,
1375 bandwidth: 125_000,
1376 txpower: 7,
1377 spreading_factor: 8,
1378 coding_rate: 5,
1379 flow_control: false,
1380 st_alock: None,
1381 lt_alock: None,
1382 };
1383 let mut config = RNodeConfig {
1384 name: "test-rnode".into(),
1385 port: port_path.display().to_string(),
1386 speed: 115200,
1387 subinterfaces: vec![sub],
1388 id_interval: None,
1389 id_callsign: None,
1390 base_interface_id: InterfaceId(41),
1391 pre_opened_fd: None,
1392 runtime: Arc::new(Mutex::new(RNodeRuntime {
1393 sub: RNodeSubConfig {
1394 name: String::new(),
1395 frequency: 868_000_000,
1396 bandwidth: 125_000,
1397 txpower: 7,
1398 spreading_factor: 8,
1399 coding_rate: 5,
1400 flow_control: false,
1401 st_alock: None,
1402 lt_alock: None,
1403 },
1404 writer: None,
1405 })),
1406 };
1407 config.runtime = Arc::new(Mutex::new(RNodeRuntime::from_config(&config)));
1408
1409 let _writers = start(config, tx).unwrap();
1410
1411 thread::sleep(Duration::from_secs(3));
1412 mock_respond_detect(&mut master1);
1413 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1414 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1415 });
1416 assert!(matches!(
1417 up,
1418 Event::InterfaceUp(InterfaceId(41), None, None)
1419 ));
1420
1421 drop(master1);
1422 drop(slave1);
1423
1424 let down = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1425 matches!(event, Event::InterfaceDown(InterfaceId(41)))
1426 });
1427 assert!(matches!(down, Event::InterfaceDown(InterfaceId(41))));
1428
1429 let (master2_fd, slave2_fd) = open_pty_pair().unwrap();
1430 let slave2_path = slave_tty_path(slave2_fd);
1431 std::fs::remove_file(&port_path).unwrap();
1432 std::os::unix::fs::symlink(&slave2_path, &port_path).unwrap();
1433
1434 let mut master2 = unsafe { transport_from_raw_fd(master2_fd) };
1435 let _slave2 = unsafe { transport_from_raw_fd(slave2_fd) };
1436
1437 thread::sleep(Duration::from_secs(3));
1438 mock_respond_detect(&mut master2);
1439 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1440 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1441 });
1442 assert!(matches!(
1443 up,
1444 Event::InterfaceUp(InterfaceId(41), Some(_), None)
1445 ));
1446 }
1447}