1use std::io::{self, Read, Write};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::InterfaceId;
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18use crate::rnode_kiss;
19use crate::serial::{Parity, SerialConfig, SerialPort};
20
21pub const FREQ_MIN: u32 = 137_000_000;
23pub const FREQ_MAX: u32 = 3_000_000_000;
24pub const BW_MIN: u32 = 7_800;
25pub const BW_MAX: u32 = 1_625_000;
26pub const SF_MIN: u8 = 5;
27pub const SF_MAX: u8 = 12;
28pub const CR_MIN: u8 = 5;
29pub const CR_MAX: u8 = 8;
30pub const TXPOWER_MIN: i8 = 0;
31pub const TXPOWER_MAX: i8 = 37;
32pub const HW_MTU: u32 = 508;
33
34#[derive(Debug, Clone)]
36pub struct RNodeSubConfig {
37 pub name: String,
38 pub frequency: u32,
39 pub bandwidth: u32,
40 pub txpower: i8,
41 pub spreading_factor: u8,
42 pub coding_rate: u8,
43 pub flow_control: bool,
44 pub st_alock: Option<f32>,
45 pub lt_alock: Option<f32>,
46}
47
48#[derive(Debug, Clone)]
50pub struct RNodeConfig {
51 pub name: String,
52 pub port: String,
53 pub speed: u32,
54 pub subinterfaces: Vec<RNodeSubConfig>,
55 pub id_interval: Option<u32>,
56 pub id_callsign: Option<Vec<u8>>,
57 pub base_interface_id: InterfaceId,
58 pub pre_opened_fd: Option<i32>,
61 pub runtime: Arc<Mutex<RNodeRuntime>>,
62}
63
64#[derive(Debug, Clone)]
65pub struct RNodeRuntime {
66 pub sub: RNodeSubConfig,
67 pub writer: Option<Arc<Mutex<std::fs::File>>>,
68}
69
70impl RNodeRuntime {
71 pub fn from_config(config: &RNodeConfig) -> Self {
72 Self {
73 sub: config
74 .subinterfaces
75 .first()
76 .cloned()
77 .unwrap_or_else(|| RNodeSubConfig {
78 name: config.name.clone(),
79 frequency: 868_000_000,
80 bandwidth: 125_000,
81 txpower: 7,
82 spreading_factor: 8,
83 coding_rate: 5,
84 flow_control: false,
85 st_alock: None,
86 lt_alock: None,
87 }),
88 writer: None,
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
94pub struct RNodeRuntimeConfigHandle {
95 pub interface_name: String,
96 pub runtime: Arc<Mutex<RNodeRuntime>>,
97 pub startup: RNodeRuntime,
98}
99
100impl Default for RNodeConfig {
101 fn default() -> Self {
102 let mut config = RNodeConfig {
103 name: String::new(),
104 port: String::new(),
105 speed: 115200,
106 subinterfaces: Vec::new(),
107 id_interval: None,
108 id_callsign: None,
109 base_interface_id: InterfaceId(0),
110 pre_opened_fd: None,
111 runtime: Arc::new(Mutex::new(RNodeRuntime {
112 sub: RNodeSubConfig {
113 name: String::new(),
114 frequency: 868_000_000,
115 bandwidth: 125_000,
116 txpower: 7,
117 spreading_factor: 8,
118 coding_rate: 5,
119 flow_control: false,
120 st_alock: None,
121 lt_alock: None,
122 },
123 writer: None,
124 })),
125 };
126 let startup = RNodeRuntime::from_config(&config);
127 config.runtime = Arc::new(Mutex::new(startup));
128 config
129 }
130}
131
132pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
134 if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
135 return Some(format!(
136 "Invalid frequency {} for {}",
137 sub.frequency, sub.name
138 ));
139 }
140 if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
141 return Some(format!(
142 "Invalid bandwidth {} for {}",
143 sub.bandwidth, sub.name
144 ));
145 }
146 if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
147 return Some(format!(
148 "Invalid SF {} for {}",
149 sub.spreading_factor, sub.name
150 ));
151 }
152 if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
153 return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
154 }
155 if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
156 return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
157 }
158 if let Some(st) = sub.st_alock {
159 if st < 0.0 || st > 100.0 {
160 return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
161 }
162 }
163 if let Some(lt) = sub.lt_alock {
164 if lt < 0.0 || lt > 100.0 {
165 return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
166 }
167 }
168 None
169}
170
171struct RNodeSubWriter {
174 writer: Arc<Mutex<std::fs::File>>,
175 index: u8,
176 flow_control: bool,
177 flow_state: Arc<Mutex<SubFlowState>>,
178}
179
180struct SubFlowState {
181 ready: bool,
182 queue: std::collections::VecDeque<Vec<u8>>,
183}
184
185fn make_sub_writer(
186 writer: Arc<Mutex<std::fs::File>>,
187 index: u8,
188 flow_control: bool,
189 flow_state: Arc<Mutex<SubFlowState>>,
190) -> Box<dyn Writer> {
191 Box::new(RNodeSubWriter {
192 writer,
193 index,
194 flow_control,
195 flow_state,
196 })
197}
198
199impl Writer for RNodeSubWriter {
200 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
201 let frame = rnode_kiss::rnode_data_frame(self.index, data);
202 if self.flow_control {
203 let mut state = self.flow_state.lock().unwrap();
204 if state.ready {
205 state.ready = false;
206 drop(state);
207 self.writer.lock().unwrap().write_all(&frame)
208 } else {
209 state.queue.push_back(data.to_vec());
210 Ok(())
211 }
212 } else {
213 self.writer.lock().unwrap().write_all(&frame)
214 }
215 }
216}
217
218pub fn start(
225 config: RNodeConfig,
226 tx: EventSender,
227) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
228 for sub in &config.subinterfaces {
230 if let Some(err) = validate_sub_config(sub) {
231 return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
232 }
233 }
234
235 if config.subinterfaces.is_empty() {
236 return Err(io::Error::new(
237 io::ErrorKind::InvalidInput,
238 "No subinterfaces configured",
239 ));
240 }
241
242 let (reader_file, shared_writer) = if let Some(fd) = config.pre_opened_fd {
243 let port = SerialPort::from_raw_fd(fd);
245 let r = port.reader()?;
246 let w = port.writer()?;
247 std::mem::forget(port); (r, Arc::new(Mutex::new(w)))
249 } else {
250 let serial_config = SerialConfig {
251 path: config.port.clone(),
252 baud: config.speed,
253 data_bits: 8,
254 parity: Parity::None,
255 stop_bits: 1,
256 };
257 let port = SerialPort::open(&serial_config)?;
258 let r = port.reader()?;
259 let w = port.writer()?;
260 (r, Arc::new(Mutex::new(w)))
261 };
262
263 let num_subs = config.subinterfaces.len();
265 let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
266 let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
267
268 for (i, sub) in config.subinterfaces.iter().enumerate() {
269 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
270 let flow_state = Arc::new(Mutex::new(SubFlowState {
271 ready: true,
272 queue: std::collections::VecDeque::new(),
273 }));
274 flow_states.push(flow_state.clone());
275 let sub_writer =
276 make_sub_writer(shared_writer.clone(), i as u8, sub.flow_control, flow_state);
277 writers.push((sub_id, sub_writer));
278 }
279
280 let reader_shared_writer = shared_writer.clone();
282 {
283 let mut runtime = config.runtime.lock().unwrap();
284 runtime.writer = Some(shared_writer.clone());
285 runtime.sub = config
286 .subinterfaces
287 .first()
288 .cloned()
289 .unwrap_or(runtime.sub.clone());
290 }
291 let reader_config = config.clone();
292 let reader_flow_states = flow_states;
293 thread::Builder::new()
294 .name(format!("rnode-reader-{}", config.base_interface_id.0))
295 .spawn(move || {
296 reader_loop(
297 reader_file,
298 reader_shared_writer,
299 reader_config,
300 tx,
301 reader_flow_states,
302 );
303 })?;
304
305 let keepalive_writer = shared_writer.clone();
308 let keepalive_name = config.name.clone();
309 thread::Builder::new()
310 .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
311 .spawn(move || {
312 let detect = rnode_kiss::detect_request();
313 loop {
314 thread::sleep(Duration::from_secs(15));
315 if let Err(e) = keepalive_writer.lock().unwrap().write_all(&detect) {
316 log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
317 }
318 }
319 })?;
320
321 Ok(writers)
322}
323
324fn reader_loop(
326 mut reader: std::fs::File,
327 writer: Arc<Mutex<std::fs::File>>,
328 config: RNodeConfig,
329 tx: EventSender,
330 flow_states: Vec<Arc<Mutex<SubFlowState>>>,
331) {
332 const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(200);
333 const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(2);
334 thread::sleep(Duration::from_secs(2));
336 let mut connected_once = false;
337 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
338 log::error!("[{}] initial RNode setup failed: {}", config.name, e);
339 return;
340 }
341 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
342 connected_once = true;
343 loop {
344 let mut decoder = rnode_kiss::RNodeDecoder::new();
345 let mut buf = [0u8; 4096];
346 let disconnected = loop {
347 match reader.read(&mut buf) {
348 Ok(0) => {
349 log::warn!("[{}] serial port closed", config.name);
350 signal_interface_down(&tx, &config);
351 break true;
352 }
353 Ok(n) => {
354 for event in decoder.feed(&buf[..n]) {
355 match event {
356 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
357 let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
358 if tx
359 .send(Event::Frame {
360 interface_id: sub_id,
361 data,
362 })
363 .is_err()
364 {
365 return;
366 }
367 }
368 rnode_kiss::RNodeEvent::Ready => {
369 for (i, fs) in flow_states.iter().enumerate() {
371 if config.subinterfaces[i].flow_control {
372 process_flow_queue(fs, &writer, i as u8);
373 }
374 }
375 }
376 rnode_kiss::RNodeEvent::Error(code) => {
377 log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
378 }
379 _ => {
380 }
382 }
383 }
384 }
385 Err(e) => {
386 log::error!("[{}] serial read error: {}", config.name, e);
387 signal_interface_down(&tx, &config);
388 break true;
389 }
390 }
391 };
392
393 if !disconnected || config.pre_opened_fd.is_some() {
394 return;
395 }
396
397 let mut backoff = RECONNECT_INITIAL_DELAY;
398 loop {
399 match reopen_connection(&config, &writer) {
400 Ok(new_reader) => {
401 reset_flow_states(&flow_states);
402 reader = new_reader;
403 if let Err(e) = detect_and_configure(&mut reader, &writer, &config) {
404 log::warn!("[{}] reconnect configure failed: {}", config.name, e);
405 thread::sleep(backoff);
406 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
407 continue;
408 }
409 signal_interface_up(&tx, &config, &writer, &flow_states, connected_once);
410 break;
411 }
412 Err(e) => {
413 log::warn!("[{}] reconnect open failed: {}", config.name, e);
414 thread::sleep(backoff);
415 backoff = std::cmp::min(backoff.saturating_mul(2), RECONNECT_MAX_DELAY);
416 }
417 }
418 }
419 }
420}
421
422fn detect_and_configure(
423 reader: &mut std::fs::File,
424 writer: &Arc<Mutex<std::fs::File>>,
425 config: &RNodeConfig,
426) -> io::Result<()> {
427 let detect_cmd = rnode_kiss::detect_request();
428 let mut cmd = detect_cmd;
429 cmd.extend_from_slice(&rnode_kiss::rnode_command(
430 rnode_kiss::CMD_FW_VERSION,
431 &[0x00],
432 ));
433 cmd.extend_from_slice(&rnode_kiss::rnode_command(
434 rnode_kiss::CMD_FW_DETAIL,
435 &[0x00],
436 ));
437 cmd.extend_from_slice(&rnode_kiss::rnode_command(
438 rnode_kiss::CMD_PLATFORM,
439 &[0x00],
440 ));
441 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
442
443 writer.lock().unwrap().write_all(&cmd)?;
444
445 let mut decoder = rnode_kiss::RNodeDecoder::new();
446 let mut buf = [0u8; 4096];
447 let mut detected = false;
448 let detect_start = std::time::Instant::now();
449 let detect_timeout = Duration::from_secs(5);
450
451 while !detected && detect_start.elapsed() < detect_timeout {
452 match reader.read(&mut buf) {
453 Ok(0) => {
454 return Err(io::Error::new(
455 io::ErrorKind::UnexpectedEof,
456 "serial port closed during detect",
457 ));
458 }
459 Ok(n) => {
460 for event in decoder.feed(&buf[..n]) {
461 match event {
462 rnode_kiss::RNodeEvent::Detected(true) => {
463 detected = true;
464 log::info!("[{}] RNode device detected", config.name);
465 }
466 rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
467 log::info!("[{}] firmware version {}.{}", config.name, major, minor);
468 }
469 rnode_kiss::RNodeEvent::FirmwareDetail(ref detail) => {
470 log::info!("[{}] firmware detail: {}", config.name, detail);
471 }
472 rnode_kiss::RNodeEvent::Platform(p) => {
473 log::info!("[{}] platform: 0x{:02X}", config.name, p);
474 }
475 rnode_kiss::RNodeEvent::Mcu(m) => {
476 log::info!("[{}] MCU: 0x{:02X}", config.name, m);
477 }
478 _ => {}
479 }
480 }
481 }
482 Err(e) => {
483 return Err(io::Error::new(
484 e.kind(),
485 format!("serial read error during detect: {}", e),
486 ));
487 }
488 }
489 }
490
491 if !detected {
492 return Err(io::Error::new(
493 io::ErrorKind::TimedOut,
494 "RNode detection timed out",
495 ));
496 }
497
498 for (i, sub) in config.subinterfaces.iter().enumerate() {
499 configure_subinterface(writer, i as u8, sub, config.subinterfaces.len() > 1)?;
500 }
501
502 thread::sleep(Duration::from_millis(300));
503 log::info!(
504 "[{}] RNode configured with {} subinterface(s)",
505 config.name,
506 config.subinterfaces.len()
507 );
508 Ok(())
509}
510
511fn signal_interface_down(tx: &EventSender, config: &RNodeConfig) {
512 for i in 0..config.subinterfaces.len() {
513 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
514 let _ = tx.send(Event::InterfaceDown(sub_id));
515 }
516}
517
518fn signal_interface_up(
519 tx: &EventSender,
520 config: &RNodeConfig,
521 writer: &Arc<Mutex<std::fs::File>>,
522 flow_states: &[Arc<Mutex<SubFlowState>>],
523 reconnected: bool,
524) {
525 for (i, flow_state) in flow_states.iter().enumerate() {
526 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
527 let new_writer = reconnected.then(|| {
528 make_sub_writer(
529 writer.clone(),
530 i as u8,
531 config.subinterfaces[i].flow_control,
532 flow_state.clone(),
533 )
534 });
535 let _ = tx.send(Event::InterfaceUp(sub_id, new_writer, None));
536 }
537}
538
539fn reset_flow_states(flow_states: &[Arc<Mutex<SubFlowState>>]) {
540 for flow_state in flow_states {
541 let mut state = flow_state.lock().unwrap();
542 state.ready = true;
543 state.queue.clear();
544 }
545}
546
547fn reopen_connection(
548 config: &RNodeConfig,
549 writer: &Arc<Mutex<std::fs::File>>,
550) -> io::Result<std::fs::File> {
551 let serial_config = SerialConfig {
552 path: config.port.clone(),
553 baud: config.speed,
554 data_bits: 8,
555 parity: Parity::None,
556 stop_bits: 1,
557 };
558 let port = SerialPort::open(&serial_config)?;
559 let reader = port.reader()?;
560 let new_writer = port.writer()?;
561 *writer.lock().unwrap() = new_writer;
562 Ok(reader)
563}
564
565pub(crate) fn configure_subinterface(
567 writer: &Arc<Mutex<std::fs::File>>,
568 index: u8,
569 sub: &RNodeSubConfig,
570 multi: bool,
571) -> io::Result<()> {
572 let mut w = writer.lock().unwrap();
573
574 let freq_bytes = [
576 (sub.frequency >> 24) as u8,
577 (sub.frequency >> 16) as u8,
578 (sub.frequency >> 8) as u8,
579 (sub.frequency & 0xFF) as u8,
580 ];
581 let bw_bytes = [
582 (sub.bandwidth >> 24) as u8,
583 (sub.bandwidth >> 16) as u8,
584 (sub.bandwidth >> 8) as u8,
585 (sub.bandwidth & 0xFF) as u8,
586 ];
587 let txp = if sub.txpower < 0 {
588 (sub.txpower as i16 + 256) as u8
589 } else {
590 sub.txpower as u8
591 };
592
593 if multi {
594 w.write_all(&rnode_kiss::rnode_select_command(
595 index,
596 rnode_kiss::CMD_FREQUENCY,
597 &freq_bytes,
598 ))?;
599 w.write_all(&rnode_kiss::rnode_select_command(
600 index,
601 rnode_kiss::CMD_BANDWIDTH,
602 &bw_bytes,
603 ))?;
604 w.write_all(&rnode_kiss::rnode_select_command(
605 index,
606 rnode_kiss::CMD_TXPOWER,
607 &[txp],
608 ))?;
609 w.write_all(&rnode_kiss::rnode_select_command(
610 index,
611 rnode_kiss::CMD_SF,
612 &[sub.spreading_factor],
613 ))?;
614 w.write_all(&rnode_kiss::rnode_select_command(
615 index,
616 rnode_kiss::CMD_CR,
617 &[sub.coding_rate],
618 ))?;
619 } else {
620 w.write_all(&rnode_kiss::rnode_command(
621 rnode_kiss::CMD_FREQUENCY,
622 &freq_bytes,
623 ))?;
624 w.write_all(&rnode_kiss::rnode_command(
625 rnode_kiss::CMD_BANDWIDTH,
626 &bw_bytes,
627 ))?;
628 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
629 w.write_all(&rnode_kiss::rnode_command(
630 rnode_kiss::CMD_SF,
631 &[sub.spreading_factor],
632 ))?;
633 w.write_all(&rnode_kiss::rnode_command(
634 rnode_kiss::CMD_CR,
635 &[sub.coding_rate],
636 ))?;
637 }
638
639 if let Some(st) = sub.st_alock {
641 let st_val = (st * 100.0) as u16;
642 let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
643 if multi {
644 w.write_all(&rnode_kiss::rnode_select_command(
645 index,
646 rnode_kiss::CMD_ST_ALOCK,
647 &st_bytes,
648 ))?;
649 } else {
650 w.write_all(&rnode_kiss::rnode_command(
651 rnode_kiss::CMD_ST_ALOCK,
652 &st_bytes,
653 ))?;
654 }
655 }
656 if let Some(lt) = sub.lt_alock {
657 let lt_val = (lt * 100.0) as u16;
658 let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
659 if multi {
660 w.write_all(&rnode_kiss::rnode_select_command(
661 index,
662 rnode_kiss::CMD_LT_ALOCK,
663 <_bytes,
664 ))?;
665 } else {
666 w.write_all(&rnode_kiss::rnode_command(
667 rnode_kiss::CMD_LT_ALOCK,
668 <_bytes,
669 ))?;
670 }
671 }
672
673 if multi {
675 w.write_all(&rnode_kiss::rnode_select_command(
676 index,
677 rnode_kiss::CMD_RADIO_STATE,
678 &[rnode_kiss::RADIO_STATE_ON],
679 ))?;
680 } else {
681 w.write_all(&rnode_kiss::rnode_command(
682 rnode_kiss::CMD_RADIO_STATE,
683 &[rnode_kiss::RADIO_STATE_ON],
684 ))?;
685 }
686
687 Ok(())
688}
689
690fn process_flow_queue(
692 flow_state: &Arc<Mutex<SubFlowState>>,
693 writer: &Arc<Mutex<std::fs::File>>,
694 index: u8,
695) {
696 let mut state = flow_state.lock().unwrap();
697 if let Some(data) = state.queue.pop_front() {
698 state.ready = false;
699 drop(state);
700 let frame = rnode_kiss::rnode_data_frame(index, &data);
701 let _ = writer.lock().unwrap().write_all(&frame);
702 } else {
703 state.ready = true;
704 }
705}
706
707use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult, SubInterface};
710use rns_core::transport::types::InterfaceInfo;
711use std::collections::HashMap;
712
713pub struct RNodeFactory;
715
716impl InterfaceFactory for RNodeFactory {
717 fn type_name(&self) -> &str {
718 "RNodeInterface"
719 }
720
721 fn default_ifac_size(&self) -> usize {
722 8
723 }
724
725 fn parse_config(
726 &self,
727 name: &str,
728 id: InterfaceId,
729 params: &HashMap<String, String>,
730 ) -> Result<Box<dyn InterfaceConfigData>, String> {
731 let pre_opened_fd = params.get("fd").and_then(|v| v.parse::<i32>().ok());
732
733 let port = params
734 .get("port")
735 .cloned()
736 .or_else(|| pre_opened_fd.map(|_| "usb-bridge".to_string()))
737 .ok_or_else(|| "RNodeInterface requires 'port' or 'fd'".to_string())?;
738
739 let speed = params
740 .get("speed")
741 .and_then(|v| v.parse().ok())
742 .unwrap_or(115200u32);
743
744 let frequency = params
745 .get("frequency")
746 .and_then(|v| v.parse().ok())
747 .unwrap_or(868_000_000u32);
748
749 let bandwidth = params
750 .get("bandwidth")
751 .and_then(|v| v.parse().ok())
752 .unwrap_or(125_000u32);
753
754 let txpower = params
755 .get("txpower")
756 .and_then(|v| v.parse().ok())
757 .unwrap_or(7i8);
758
759 let spreading_factor = params
760 .get("spreadingfactor")
761 .or_else(|| params.get("spreading_factor"))
762 .and_then(|v| v.parse().ok())
763 .unwrap_or(8u8);
764
765 let coding_rate = params
766 .get("codingrate")
767 .or_else(|| params.get("coding_rate"))
768 .and_then(|v| v.parse().ok())
769 .unwrap_or(5u8);
770
771 let flow_control = params
772 .get("flow_control")
773 .and_then(|v| crate::config::parse_bool_pub(v))
774 .unwrap_or(false);
775
776 let st_alock = params.get("st_alock").and_then(|v| v.parse().ok());
777
778 let lt_alock = params.get("lt_alock").and_then(|v| v.parse().ok());
779
780 let id_interval = params.get("id_interval").and_then(|v| v.parse().ok());
781
782 let id_callsign = params.get("id_callsign").map(|v| v.as_bytes().to_vec());
783
784 let sub = RNodeSubConfig {
785 name: name.to_string(),
786 frequency,
787 bandwidth,
788 txpower,
789 spreading_factor,
790 coding_rate,
791 flow_control,
792 st_alock,
793 lt_alock,
794 };
795
796 Ok(Box::new(RNodeConfig {
797 name: name.to_string(),
798 port,
799 speed,
800 subinterfaces: vec![sub],
801 id_interval,
802 id_callsign,
803 base_interface_id: id,
804 pre_opened_fd,
805 runtime: Arc::new(Mutex::new(RNodeRuntime {
806 sub: RNodeSubConfig {
807 name: name.to_string(),
808 frequency,
809 bandwidth,
810 txpower,
811 spreading_factor,
812 coding_rate,
813 flow_control,
814 st_alock,
815 lt_alock,
816 },
817 writer: None,
818 })),
819 }))
820 }
821
822 fn start(
823 &self,
824 config: Box<dyn InterfaceConfigData>,
825 ctx: StartContext,
826 ) -> std::io::Result<StartResult> {
827 let rnode_config = *config.into_any().downcast::<RNodeConfig>().map_err(|_| {
828 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
829 })?;
830
831 let name = rnode_config.name.clone();
832
833 let pairs = start(rnode_config, ctx.tx)?;
834
835 let mut subs = Vec::with_capacity(pairs.len());
836 for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
837 let sub_name = if index == 0 {
838 name.clone()
839 } else {
840 format!("{}/{}", name, index)
841 };
842
843 let info = InterfaceInfo {
844 id: sub_id,
845 name: sub_name,
846 mode: ctx.mode,
847 out_capable: true,
848 in_capable: true,
849 bitrate: None,
850 announce_rate_target: None,
851 announce_rate_grace: 0,
852 announce_rate_penalty: 0.0,
853 announce_cap: rns_core::constants::ANNOUNCE_CAP,
854 is_local_client: false,
855 wants_tunnel: false,
856 tunnel_id: None,
857 mtu: rns_core::constants::MTU as u32,
858 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
859 ia_freq: 0.0,
860 started: crate::time::now(),
861 };
862
863 subs.push(SubInterface {
864 id: sub_id,
865 info,
866 writer,
867 interface_type_name: "RNodeInterface".to_string(),
868 });
869 }
870
871 Ok(StartResult::Multi(subs))
872 }
873}
874
875pub(crate) fn rnode_runtime_handle_from_config(config: &RNodeConfig) -> RNodeRuntimeConfigHandle {
876 RNodeRuntimeConfigHandle {
877 interface_name: config.name.clone(),
878 runtime: Arc::clone(&config.runtime),
879 startup: RNodeRuntime::from_config(config),
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use crate::event;
887 use crate::kiss;
888 use crate::serial::open_pty_pair;
889 use std::os::unix::io::{AsRawFd, FromRawFd};
890 use std::path::PathBuf;
891 use std::sync::mpsc::RecvTimeoutError;
892 use tempfile::tempdir;
893 fn poll_read(fd: i32, timeout_ms: i32) -> bool {
895 let mut pfd = libc::pollfd {
896 fd,
897 events: libc::POLLIN,
898 revents: 0,
899 };
900 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
901 ret > 0
902 }
903
904 fn read_available(file: &mut std::fs::File) -> Vec<u8> {
906 let mut all = Vec::new();
907 let mut buf = [0u8; 4096];
908 while poll_read(file.as_raw_fd(), 100) {
909 match file.read(&mut buf) {
910 Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
911 _ => break,
912 }
913 }
914 all
915 }
916
917 fn slave_tty_path(fd: i32) -> PathBuf {
918 let mut buf = [0u8; 256];
919 let rc = unsafe { libc::ttyname_r(fd, buf.as_mut_ptr().cast(), buf.len()) };
920 assert_eq!(rc, 0, "ttyname_r failed for fd {}", fd);
921 let nul = buf.iter().position(|b| *b == 0).unwrap_or(buf.len());
922 PathBuf::from(std::str::from_utf8(&buf[..nul]).unwrap())
923 }
924
925 fn wait_for_interface_event<F>(
926 rx: &std::sync::mpsc::Receiver<Event>,
927 timeout: Duration,
928 predicate: F,
929 ) -> Event
930 where
931 F: Fn(&Event) -> bool,
932 {
933 let deadline = std::time::Instant::now() + timeout;
934 loop {
935 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
936 match rx.recv_timeout(remaining) {
937 Ok(event) if predicate(&event) => return event,
938 Ok(_) => continue,
939 Err(RecvTimeoutError::Timeout) => panic!("timed out waiting for interface event"),
940 Err(RecvTimeoutError::Disconnected) => panic!("event channel disconnected"),
941 }
942 }
943 }
944
945 fn mock_respond_detect(master: &mut std::fs::File) {
947 master
949 .write_all(&rnode_kiss::rnode_command(
950 rnode_kiss::CMD_DETECT,
951 &[rnode_kiss::DETECT_RESP],
952 ))
953 .unwrap();
954 master
956 .write_all(&rnode_kiss::rnode_command(
957 rnode_kiss::CMD_FW_VERSION,
958 &[0x01, 0x4A],
959 ))
960 .unwrap();
961 master
963 .write_all(&rnode_kiss::rnode_command(
964 rnode_kiss::CMD_PLATFORM,
965 &[0x80],
966 ))
967 .unwrap();
968 master
970 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
971 .unwrap();
972 master.flush().unwrap();
973 }
974
975 #[test]
976 fn rnode_detect_over_pty() {
977 let (master_fd, slave_fd) = open_pty_pair().unwrap();
979 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
980 let mut slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
981
982 mock_respond_detect(&mut master);
984
985 let mut decoder = rnode_kiss::RNodeDecoder::new();
986 let mut events = Vec::new();
987 let deadline = std::time::Instant::now() + Duration::from_secs(2);
988
989 while std::time::Instant::now() < deadline {
990 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
991 let timeout_ms = remaining.as_millis().min(i32::MAX as u128) as i32;
992 if !poll_read(slave.as_raw_fd(), timeout_ms) {
993 break;
994 }
995
996 let chunk = read_available(&mut slave);
997 if chunk.is_empty() {
998 continue;
999 }
1000
1001 events.extend(decoder.feed(&chunk));
1002
1003 let saw_detect = events
1004 .iter()
1005 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true)));
1006 let saw_firmware = events
1007 .iter()
1008 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. }));
1009 if saw_detect && saw_firmware {
1010 break;
1011 }
1012 }
1013
1014 assert!(
1015 events
1016 .iter()
1017 .any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
1018 "should detect device"
1019 );
1020 assert!(
1021 events
1022 .iter()
1023 .any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
1024 "should get firmware version"
1025 );
1026 }
1027
1028 #[test]
1029 fn rnode_configure_commands() {
1030 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1031 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1032 let writer_file = unsafe { std::fs::File::from_raw_fd(libc::dup(slave_fd)) };
1033 let writer = Arc::new(Mutex::new(writer_file));
1034
1035 let sub = RNodeSubConfig {
1036 name: "test".into(),
1037 frequency: 868_000_000,
1038 bandwidth: 125_000,
1039 txpower: 7,
1040 spreading_factor: 8,
1041 coding_rate: 5,
1042 flow_control: false,
1043 st_alock: None,
1044 lt_alock: None,
1045 };
1046
1047 configure_subinterface(&writer, 0, &sub, false).unwrap();
1048
1049 let data = read_available(&mut master);
1051
1052 assert!(
1054 data.windows(2)
1055 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
1056 "should contain FREQUENCY command"
1057 );
1058 assert!(
1060 data.windows(2)
1061 .any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
1062 "should contain BANDWIDTH command"
1063 );
1064 assert!(
1066 data.windows(3).any(|w| w[0] == kiss::FEND
1067 && w[1] == rnode_kiss::CMD_RADIO_STATE
1068 && w[2] == rnode_kiss::RADIO_STATE_ON),
1069 "should contain RADIO_STATE ON command"
1070 );
1071
1072 unsafe { libc::close(slave_fd) };
1073 }
1074
1075 #[test]
1076 fn rnode_data_roundtrip() {
1077 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1078 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1079 let slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1080
1081 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
1083 let frame = rnode_kiss::rnode_data_frame(0, &payload);
1084 master.write_all(&frame).unwrap();
1085 master.flush().unwrap();
1086
1087 assert!(poll_read(slave.as_raw_fd(), 2000));
1089 let mut decoder = rnode_kiss::RNodeDecoder::new();
1090 let mut buf = [0u8; 4096];
1091 let mut slave_file = slave;
1092 let n = slave_file.read(&mut buf).unwrap();
1093 let events = decoder.feed(&buf[..n]);
1094
1095 assert_eq!(events.len(), 1);
1096 match &events[0] {
1097 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
1098 assert_eq!(*index, 0);
1099 assert_eq!(data, &payload);
1100 }
1101 other => panic!("expected DataFrame, got {:?}", other),
1102 }
1103 }
1104
1105 #[test]
1106 fn rnode_flow_control() {
1107 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1108 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1109 let shared_writer = Arc::new(Mutex::new(writer_file));
1110
1111 let flow_state = Arc::new(Mutex::new(SubFlowState {
1112 ready: true,
1113 queue: std::collections::VecDeque::new(),
1114 }));
1115
1116 let mut sub_writer = RNodeSubWriter {
1117 writer: shared_writer.clone(),
1118 index: 0,
1119 flow_control: true,
1120 flow_state: flow_state.clone(),
1121 };
1122
1123 sub_writer.send_frame(b"hello").unwrap();
1125 assert!(!flow_state.lock().unwrap().ready);
1126
1127 sub_writer.send_frame(b"world").unwrap();
1129 assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
1130
1131 process_flow_queue(&flow_state, &shared_writer, 0);
1133 assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
1134 assert!(!flow_state.lock().unwrap().ready); process_flow_queue(&flow_state, &shared_writer, 0);
1138 assert!(flow_state.lock().unwrap().ready);
1139
1140 unsafe { libc::close(master_fd) };
1141 }
1142
1143 #[test]
1144 fn rnode_sub_writer_format() {
1145 let (master_fd, slave_fd) = open_pty_pair().unwrap();
1146 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
1147 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
1148 let shared_writer = Arc::new(Mutex::new(writer_file));
1149
1150 let flow_state = Arc::new(Mutex::new(SubFlowState {
1151 ready: true,
1152 queue: std::collections::VecDeque::new(),
1153 }));
1154
1155 let mut sub_writer = RNodeSubWriter {
1156 writer: shared_writer,
1157 index: 1, flow_control: false,
1159 flow_state,
1160 };
1161
1162 let payload = vec![0xAA, 0xBB, 0xCC];
1163 sub_writer.send_frame(&payload).unwrap();
1164
1165 assert!(poll_read(master.as_raw_fd(), 2000));
1167 let mut buf = [0u8; 256];
1168 let n = master.read(&mut buf).unwrap();
1169
1170 assert_eq!(buf[0], kiss::FEND);
1172 assert_eq!(buf[1], 0x10); assert_eq!(buf[n - 1], kiss::FEND);
1174 }
1175
1176 #[test]
1177 fn rnode_multi_sub_routing() {
1178 let mut decoder = rnode_kiss::RNodeDecoder::new();
1180
1181 let payload0 = vec![0x01, 0x02];
1182 let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
1183 let events0 = decoder.feed(&frame0);
1184 assert_eq!(events0.len(), 1);
1185 assert_eq!(
1186 events0[0],
1187 rnode_kiss::RNodeEvent::DataFrame {
1188 index: 0,
1189 data: payload0
1190 }
1191 );
1192
1193 let payload1 = vec![0x03, 0x04];
1194 let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
1195 let events1 = decoder.feed(&frame1);
1196 assert_eq!(events1.len(), 1);
1197 assert_eq!(
1198 events1[0],
1199 rnode_kiss::RNodeEvent::DataFrame {
1200 index: 1,
1201 data: payload1
1202 }
1203 );
1204 }
1205
1206 #[test]
1207 fn rnode_error_handling() {
1208 let mut decoder = rnode_kiss::RNodeDecoder::new();
1216 let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
1217 let events = decoder.feed(&frame);
1218 assert_eq!(events.len(), 1);
1219 assert_eq!(
1221 events[0],
1222 rnode_kiss::RNodeEvent::DataFrame {
1223 index: 5,
1224 data: vec![0x02]
1225 }
1226 );
1227 }
1228
1229 #[test]
1230 fn rnode_config_validation() {
1231 let good = RNodeSubConfig {
1232 name: "test".into(),
1233 frequency: 868_000_000,
1234 bandwidth: 125_000,
1235 txpower: 7,
1236 spreading_factor: 8,
1237 coding_rate: 5,
1238 flow_control: false,
1239 st_alock: None,
1240 lt_alock: None,
1241 };
1242 assert!(validate_sub_config(&good).is_none());
1243
1244 let mut bad = good.clone();
1246 bad.frequency = 100;
1247 assert!(validate_sub_config(&bad).is_some());
1248
1249 bad = good.clone();
1251 bad.spreading_factor = 13;
1252 assert!(validate_sub_config(&bad).is_some());
1253
1254 bad = good.clone();
1256 bad.coding_rate = 9;
1257 assert!(validate_sub_config(&bad).is_some());
1258
1259 bad = good.clone();
1261 bad.bandwidth = 5;
1262 assert!(validate_sub_config(&bad).is_some());
1263
1264 bad = good.clone();
1266 bad.txpower = 50;
1267 assert!(validate_sub_config(&bad).is_some());
1268 }
1269
1270 #[test]
1271 fn rnode_reconnects_after_serial_disconnect() {
1272 let tempdir = tempdir().unwrap();
1273 let port_path = tempdir.path().join("rnode-port");
1274
1275 let (master1_fd, slave1_fd) = open_pty_pair().unwrap();
1276 let slave1_path = slave_tty_path(slave1_fd);
1277 std::os::unix::fs::symlink(&slave1_path, &port_path).unwrap();
1278
1279 let mut master1 = unsafe { std::fs::File::from_raw_fd(master1_fd) };
1280 let slave1 = unsafe { std::fs::File::from_raw_fd(slave1_fd) };
1281
1282 let (tx, rx) = event::channel();
1283 let sub = RNodeSubConfig {
1284 name: "test-rnode".into(),
1285 frequency: 868_000_000,
1286 bandwidth: 125_000,
1287 txpower: 7,
1288 spreading_factor: 8,
1289 coding_rate: 5,
1290 flow_control: false,
1291 st_alock: None,
1292 lt_alock: None,
1293 };
1294 let mut config = RNodeConfig {
1295 name: "test-rnode".into(),
1296 port: port_path.display().to_string(),
1297 speed: 115200,
1298 subinterfaces: vec![sub],
1299 id_interval: None,
1300 id_callsign: None,
1301 base_interface_id: InterfaceId(41),
1302 pre_opened_fd: None,
1303 runtime: Arc::new(Mutex::new(RNodeRuntime {
1304 sub: RNodeSubConfig {
1305 name: String::new(),
1306 frequency: 868_000_000,
1307 bandwidth: 125_000,
1308 txpower: 7,
1309 spreading_factor: 8,
1310 coding_rate: 5,
1311 flow_control: false,
1312 st_alock: None,
1313 lt_alock: None,
1314 },
1315 writer: None,
1316 })),
1317 };
1318 config.runtime = Arc::new(Mutex::new(RNodeRuntime::from_config(&config)));
1319
1320 let _writers = start(config, tx).unwrap();
1321
1322 thread::sleep(Duration::from_secs(3));
1323 mock_respond_detect(&mut master1);
1324 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1325 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1326 });
1327 assert!(matches!(
1328 up,
1329 Event::InterfaceUp(InterfaceId(41), None, None)
1330 ));
1331
1332 drop(master1);
1333 drop(slave1);
1334
1335 let down = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1336 matches!(event, Event::InterfaceDown(InterfaceId(41)))
1337 });
1338 assert!(matches!(down, Event::InterfaceDown(InterfaceId(41))));
1339
1340 let (master2_fd, slave2_fd) = open_pty_pair().unwrap();
1341 let slave2_path = slave_tty_path(slave2_fd);
1342 std::fs::remove_file(&port_path).unwrap();
1343 std::os::unix::fs::symlink(&slave2_path, &port_path).unwrap();
1344
1345 let mut master2 = unsafe { std::fs::File::from_raw_fd(master2_fd) };
1346 let _slave2 = unsafe { std::fs::File::from_raw_fd(slave2_fd) };
1347
1348 thread::sleep(Duration::from_secs(3));
1349 mock_respond_detect(&mut master2);
1350 let up = wait_for_interface_event(&rx, Duration::from_secs(4), |event| {
1351 matches!(event, Event::InterfaceUp(InterfaceId(41), _, _))
1352 });
1353 assert!(matches!(
1354 up,
1355 Event::InterfaceUp(InterfaceId(41), Some(_), None)
1356 ));
1357 }
1358}