1use std::io::{self, Read, Write};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::InterfaceId;
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18use crate::rnode_kiss;
19use crate::serial::{Parity, SerialConfig, SerialPort};
20
21pub const FREQ_MIN: u32 = 137_000_000;
23pub const FREQ_MAX: u32 = 3_000_000_000;
24pub const BW_MIN: u32 = 7_800;
25pub const BW_MAX: u32 = 1_625_000;
26pub const SF_MIN: u8 = 5;
27pub const SF_MAX: u8 = 12;
28pub const CR_MIN: u8 = 5;
29pub const CR_MAX: u8 = 8;
30pub const TXPOWER_MIN: i8 = 0;
31pub const TXPOWER_MAX: i8 = 37;
32pub const HW_MTU: u32 = 508;
33
34#[derive(Debug, Clone)]
36pub struct RNodeSubConfig {
37 pub name: String,
38 pub frequency: u32,
39 pub bandwidth: u32,
40 pub txpower: i8,
41 pub spreading_factor: u8,
42 pub coding_rate: u8,
43 pub flow_control: bool,
44 pub st_alock: Option<f32>,
45 pub lt_alock: Option<f32>,
46}
47
48#[derive(Debug, Clone)]
50pub struct RNodeConfig {
51 pub name: String,
52 pub port: String,
53 pub speed: u32,
54 pub subinterfaces: Vec<RNodeSubConfig>,
55 pub id_interval: Option<u32>,
56 pub id_callsign: Option<Vec<u8>>,
57 pub base_interface_id: InterfaceId,
58}
59
60impl Default for RNodeConfig {
61 fn default() -> Self {
62 RNodeConfig {
63 name: String::new(),
64 port: String::new(),
65 speed: 115200,
66 subinterfaces: Vec::new(),
67 id_interval: None,
68 id_callsign: None,
69 base_interface_id: InterfaceId(0),
70 }
71 }
72}
73
74pub fn validate_sub_config(sub: &RNodeSubConfig) -> Option<String> {
76 if sub.frequency < FREQ_MIN || sub.frequency > FREQ_MAX {
77 return Some(format!("Invalid frequency {} for {}", sub.frequency, sub.name));
78 }
79 if sub.bandwidth < BW_MIN || sub.bandwidth > BW_MAX {
80 return Some(format!("Invalid bandwidth {} for {}", sub.bandwidth, sub.name));
81 }
82 if sub.spreading_factor < SF_MIN || sub.spreading_factor > SF_MAX {
83 return Some(format!("Invalid SF {} for {}", sub.spreading_factor, sub.name));
84 }
85 if sub.coding_rate < CR_MIN || sub.coding_rate > CR_MAX {
86 return Some(format!("Invalid CR {} for {}", sub.coding_rate, sub.name));
87 }
88 if sub.txpower < TXPOWER_MIN || sub.txpower > TXPOWER_MAX {
89 return Some(format!("Invalid TX power {} for {}", sub.txpower, sub.name));
90 }
91 if let Some(st) = sub.st_alock {
92 if st < 0.0 || st > 100.0 {
93 return Some(format!("Invalid ST airtime limit {} for {}", st, sub.name));
94 }
95 }
96 if let Some(lt) = sub.lt_alock {
97 if lt < 0.0 || lt > 100.0 {
98 return Some(format!("Invalid LT airtime limit {} for {}", lt, sub.name));
99 }
100 }
101 None
102}
103
104struct RNodeSubWriter {
107 writer: Arc<Mutex<std::fs::File>>,
108 index: u8,
109 flow_control: bool,
110 flow_state: Arc<Mutex<SubFlowState>>,
111}
112
113struct SubFlowState {
114 ready: bool,
115 queue: std::collections::VecDeque<Vec<u8>>,
116}
117
118impl Writer for RNodeSubWriter {
119 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
120 let frame = rnode_kiss::rnode_data_frame(self.index, data);
121 if self.flow_control {
122 let mut state = self.flow_state.lock().unwrap();
123 if state.ready {
124 state.ready = false;
125 drop(state);
126 self.writer.lock().unwrap().write_all(&frame)
127 } else {
128 state.queue.push_back(data.to_vec());
129 Ok(())
130 }
131 } else {
132 self.writer.lock().unwrap().write_all(&frame)
133 }
134 }
135}
136
137pub fn start(
144 config: RNodeConfig,
145 tx: EventSender,
146) -> io::Result<Vec<(InterfaceId, Box<dyn Writer>)>> {
147 for sub in &config.subinterfaces {
149 if let Some(err) = validate_sub_config(sub) {
150 return Err(io::Error::new(io::ErrorKind::InvalidInput, err));
151 }
152 }
153
154 if config.subinterfaces.is_empty() {
155 return Err(io::Error::new(
156 io::ErrorKind::InvalidInput,
157 "No subinterfaces configured",
158 ));
159 }
160
161 let serial_config = SerialConfig {
162 path: config.port.clone(),
163 baud: config.speed,
164 data_bits: 8,
165 parity: Parity::None,
166 stop_bits: 1,
167 };
168
169 let port = SerialPort::open(&serial_config)?;
170 let reader_file = port.reader()?;
171 let config_writer = port.writer()?;
172 let shared_writer = Arc::new(Mutex::new(config_writer));
173
174 let num_subs = config.subinterfaces.len();
176 let mut writers: Vec<(InterfaceId, Box<dyn Writer>)> = Vec::with_capacity(num_subs);
177 let mut flow_states: Vec<Arc<Mutex<SubFlowState>>> = Vec::with_capacity(num_subs);
178
179 for (i, sub) in config.subinterfaces.iter().enumerate() {
180 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
181 let flow_state = Arc::new(Mutex::new(SubFlowState {
182 ready: true,
183 queue: std::collections::VecDeque::new(),
184 }));
185 flow_states.push(flow_state.clone());
186
187 let sub_writer: Box<dyn Writer> = Box::new(RNodeSubWriter {
188 writer: shared_writer.clone(),
189 index: i as u8,
190 flow_control: sub.flow_control,
191 flow_state,
192 });
193 writers.push((sub_id, sub_writer));
194 }
195
196 let reader_shared_writer = shared_writer.clone();
198 let reader_config = config.clone();
199 let reader_flow_states = flow_states;
200 thread::Builder::new()
201 .name(format!("rnode-reader-{}", config.base_interface_id.0))
202 .spawn(move || {
203 reader_loop(
204 reader_file,
205 reader_shared_writer,
206 reader_config,
207 tx,
208 reader_flow_states,
209 );
210 })?;
211
212 let keepalive_writer = shared_writer.clone();
215 let keepalive_name = config.name.clone();
216 thread::Builder::new()
217 .name(format!("rnode-keepalive-{}", config.base_interface_id.0))
218 .spawn(move || {
219 let detect = rnode_kiss::detect_request();
220 loop {
221 thread::sleep(Duration::from_secs(15));
222 if let Err(e) = keepalive_writer.lock().unwrap().write_all(&detect) {
223 log::debug!("[{}] keepalive write failed: {}", keepalive_name, e);
224 return;
225 }
226 }
227 })?;
228
229 Ok(writers)
230}
231
232fn reader_loop(
234 mut reader: std::fs::File,
235 writer: Arc<Mutex<std::fs::File>>,
236 config: RNodeConfig,
237 tx: EventSender,
238 flow_states: Vec<Arc<Mutex<SubFlowState>>>,
239) {
240 thread::sleep(Duration::from_secs(2));
242
243 {
245 let detect_cmd = rnode_kiss::detect_request();
246 let mut cmd = detect_cmd;
248 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_FW_VERSION, &[0x00]));
249 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_PLATFORM, &[0x00]));
250 cmd.extend_from_slice(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x00]));
251
252 if let Err(e) = writer.lock().unwrap().write_all(&cmd) {
253 log::error!("[{}] failed to send detect: {}", config.name, e);
254 return;
255 }
256 }
257
258 let mut decoder = rnode_kiss::RNodeDecoder::new();
259 let mut buf = [0u8; 4096];
260 let mut detected = false;
261
262 let detect_start = std::time::Instant::now();
264 let detect_timeout = Duration::from_secs(5);
265
266 while !detected && detect_start.elapsed() < detect_timeout {
267 match reader.read(&mut buf) {
268 Ok(0) => {
269 log::warn!("[{}] serial port closed during detect", config.name);
270 return;
271 }
272 Ok(n) => {
273 for event in decoder.feed(&buf[..n]) {
274 match event {
275 rnode_kiss::RNodeEvent::Detected(true) => {
276 detected = true;
277 log::info!("[{}] RNode device detected", config.name);
278 }
279 rnode_kiss::RNodeEvent::FirmwareVersion { major, minor } => {
280 log::info!(
281 "[{}] firmware version {}.{}",
282 config.name,
283 major,
284 minor
285 );
286 }
287 rnode_kiss::RNodeEvent::Platform(p) => {
288 log::info!("[{}] platform: 0x{:02X}", config.name, p);
289 }
290 rnode_kiss::RNodeEvent::Mcu(m) => {
291 log::info!("[{}] MCU: 0x{:02X}", config.name, m);
292 }
293 _ => {}
294 }
295 }
296 }
297 Err(e) => {
298 log::error!("[{}] serial read error during detect: {}", config.name, e);
299 return;
300 }
301 }
302 }
303
304 if !detected {
305 log::error!("[{}] RNode detection timed out", config.name);
306 return;
307 }
308
309 for (i, sub) in config.subinterfaces.iter().enumerate() {
311 if let Err(e) = configure_subinterface(&writer, i as u8, sub, config.subinterfaces.len() > 1) {
312 log::error!("[{}] failed to configure subinterface {}: {}", config.name, i, e);
313 return;
314 }
315 }
316
317 thread::sleep(Duration::from_millis(300));
319
320 for i in 0..config.subinterfaces.len() {
322 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
323 let _ = tx.send(Event::InterfaceUp(sub_id, None, None));
324 }
325
326 log::info!(
327 "[{}] RNode configured with {} subinterface(s)",
328 config.name,
329 config.subinterfaces.len()
330 );
331
332 loop {
334 match reader.read(&mut buf) {
335 Ok(0) => {
336 log::warn!("[{}] serial port closed", config.name);
337 for i in 0..config.subinterfaces.len() {
338 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
339 let _ = tx.send(Event::InterfaceDown(sub_id));
340 }
341 return;
343 }
344 Ok(n) => {
345 for event in decoder.feed(&buf[..n]) {
346 match event {
347 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
348 let sub_id = InterfaceId(config.base_interface_id.0 + index as u64);
349 if tx
350 .send(Event::Frame {
351 interface_id: sub_id,
352 data,
353 })
354 .is_err()
355 {
356 return;
357 }
358 }
359 rnode_kiss::RNodeEvent::Ready => {
360 for (i, fs) in flow_states.iter().enumerate() {
362 if config.subinterfaces[i].flow_control {
363 process_flow_queue(fs, &writer, i as u8);
364 }
365 }
366 }
367 rnode_kiss::RNodeEvent::Error(code) => {
368 log::error!("[{}] RNode error: 0x{:02X}", config.name, code);
369 }
370 _ => {
371 }
373 }
374 }
375 }
376 Err(e) => {
377 log::error!("[{}] serial read error: {}", config.name, e);
378 for i in 0..config.subinterfaces.len() {
379 let sub_id = InterfaceId(config.base_interface_id.0 + i as u64);
380 let _ = tx.send(Event::InterfaceDown(sub_id));
381 }
382 return;
383 }
384 }
385 }
386}
387
388fn configure_subinterface(
390 writer: &Arc<Mutex<std::fs::File>>,
391 index: u8,
392 sub: &RNodeSubConfig,
393 multi: bool,
394) -> io::Result<()> {
395 let mut w = writer.lock().unwrap();
396
397 let freq_bytes = [
399 (sub.frequency >> 24) as u8,
400 (sub.frequency >> 16) as u8,
401 (sub.frequency >> 8) as u8,
402 (sub.frequency & 0xFF) as u8,
403 ];
404 let bw_bytes = [
405 (sub.bandwidth >> 24) as u8,
406 (sub.bandwidth >> 16) as u8,
407 (sub.bandwidth >> 8) as u8,
408 (sub.bandwidth & 0xFF) as u8,
409 ];
410 let txp = if sub.txpower < 0 {
411 (sub.txpower as i16 + 256) as u8
412 } else {
413 sub.txpower as u8
414 };
415
416 if multi {
417 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_FREQUENCY, &freq_bytes))?;
418 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_BANDWIDTH, &bw_bytes))?;
419 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_TXPOWER, &[txp]))?;
420 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_SF, &[sub.spreading_factor]))?;
421 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_CR, &[sub.coding_rate]))?;
422 } else {
423 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_FREQUENCY, &freq_bytes))?;
424 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_BANDWIDTH, &bw_bytes))?;
425 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_TXPOWER, &[txp]))?;
426 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_SF, &[sub.spreading_factor]))?;
427 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_CR, &[sub.coding_rate]))?;
428 }
429
430 if let Some(st) = sub.st_alock {
432 let st_val = (st * 100.0) as u16;
433 let st_bytes = [(st_val >> 8) as u8, (st_val & 0xFF) as u8];
434 if multi {
435 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_ST_ALOCK, &st_bytes))?;
436 } else {
437 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_ST_ALOCK, &st_bytes))?;
438 }
439 }
440 if let Some(lt) = sub.lt_alock {
441 let lt_val = (lt * 100.0) as u16;
442 let lt_bytes = [(lt_val >> 8) as u8, (lt_val & 0xFF) as u8];
443 if multi {
444 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_LT_ALOCK, <_bytes))?;
445 } else {
446 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_LT_ALOCK, <_bytes))?;
447 }
448 }
449
450 if multi {
452 w.write_all(&rnode_kiss::rnode_select_command(index, rnode_kiss::CMD_RADIO_STATE, &[rnode_kiss::RADIO_STATE_ON]))?;
453 } else {
454 w.write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_RADIO_STATE, &[rnode_kiss::RADIO_STATE_ON]))?;
455 }
456
457 Ok(())
458}
459
460fn process_flow_queue(
462 flow_state: &Arc<Mutex<SubFlowState>>,
463 writer: &Arc<Mutex<std::fs::File>>,
464 index: u8,
465) {
466 let mut state = flow_state.lock().unwrap();
467 if let Some(data) = state.queue.pop_front() {
468 state.ready = false;
469 drop(state);
470 let frame = rnode_kiss::rnode_data_frame(index, &data);
471 let _ = writer.lock().unwrap().write_all(&frame);
472 } else {
473 state.ready = true;
474 }
475}
476
477use std::collections::HashMap;
480use rns_core::transport::types::InterfaceInfo;
481use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult, SubInterface};
482
483pub struct RNodeFactory;
485
486impl InterfaceFactory for RNodeFactory {
487 fn type_name(&self) -> &str { "RNodeInterface" }
488
489 fn default_ifac_size(&self) -> usize { 8 }
490
491 fn parse_config(
492 &self,
493 name: &str,
494 id: InterfaceId,
495 params: &HashMap<String, String>,
496 ) -> Result<Box<dyn InterfaceConfigData>, String> {
497 let port = params.get("port")
498 .cloned()
499 .ok_or_else(|| "RNodeInterface requires 'port'".to_string())?;
500
501 let speed = params.get("speed")
502 .and_then(|v| v.parse().ok())
503 .unwrap_or(115200u32);
504
505 let frequency = params.get("frequency")
506 .and_then(|v| v.parse().ok())
507 .unwrap_or(868_000_000u32);
508
509 let bandwidth = params.get("bandwidth")
510 .and_then(|v| v.parse().ok())
511 .unwrap_or(125_000u32);
512
513 let txpower = params.get("txpower")
514 .and_then(|v| v.parse().ok())
515 .unwrap_or(7i8);
516
517 let spreading_factor = params.get("spreadingfactor")
518 .or_else(|| params.get("spreading_factor"))
519 .and_then(|v| v.parse().ok())
520 .unwrap_or(8u8);
521
522 let coding_rate = params.get("codingrate")
523 .or_else(|| params.get("coding_rate"))
524 .and_then(|v| v.parse().ok())
525 .unwrap_or(5u8);
526
527 let flow_control = params.get("flow_control")
528 .and_then(|v| crate::config::parse_bool_pub(v))
529 .unwrap_or(false);
530
531 let st_alock = params.get("st_alock")
532 .and_then(|v| v.parse().ok());
533
534 let lt_alock = params.get("lt_alock")
535 .and_then(|v| v.parse().ok());
536
537 let id_interval = params.get("id_interval")
538 .and_then(|v| v.parse().ok());
539
540 let id_callsign = params.get("id_callsign")
541 .map(|v| v.as_bytes().to_vec());
542
543 let sub = RNodeSubConfig {
544 name: name.to_string(),
545 frequency,
546 bandwidth,
547 txpower,
548 spreading_factor,
549 coding_rate,
550 flow_control,
551 st_alock,
552 lt_alock,
553 };
554
555 Ok(Box::new(RNodeConfig {
556 name: name.to_string(),
557 port,
558 speed,
559 subinterfaces: vec![sub],
560 id_interval,
561 id_callsign,
562 base_interface_id: id,
563 }))
564 }
565
566 fn start(
567 &self,
568 config: Box<dyn InterfaceConfigData>,
569 ctx: StartContext,
570 ) -> std::io::Result<StartResult> {
571 let rnode_config = *config.into_any().downcast::<RNodeConfig>()
572 .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type"))?;
573
574 let name = rnode_config.name.clone();
575
576 let pairs = start(rnode_config, ctx.tx)?;
577
578 let mut subs = Vec::with_capacity(pairs.len());
579 for (index, (sub_id, writer)) in pairs.into_iter().enumerate() {
580 let sub_name = if index == 0 {
581 name.clone()
582 } else {
583 format!("{}/{}", name, index)
584 };
585
586 let info = InterfaceInfo {
587 id: sub_id,
588 name: sub_name,
589 mode: ctx.mode,
590 out_capable: true,
591 in_capable: true,
592 bitrate: None,
593 announce_rate_target: None,
594 announce_rate_grace: 0,
595 announce_rate_penalty: 0.0,
596 announce_cap: rns_core::constants::ANNOUNCE_CAP,
597 is_local_client: false,
598 wants_tunnel: false,
599 tunnel_id: None,
600 mtu: rns_core::constants::MTU as u32,
601 ingress_control: false,
602 ia_freq: 0.0,
603 started: crate::time::now(),
604 };
605
606 subs.push(SubInterface {
607 id: sub_id,
608 info,
609 writer,
610 interface_type_name: "RNodeInterface".to_string(),
611 });
612 }
613
614 Ok(StartResult::Multi(subs))
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use crate::kiss;
622 use crate::serial::open_pty_pair;
623 use std::os::unix::io::{AsRawFd, FromRawFd};
624 fn poll_read(fd: i32, timeout_ms: i32) -> bool {
626 let mut pfd = libc::pollfd {
627 fd,
628 events: libc::POLLIN,
629 revents: 0,
630 };
631 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
632 ret > 0
633 }
634
635 fn read_available(file: &mut std::fs::File) -> Vec<u8> {
637 let mut all = Vec::new();
638 let mut buf = [0u8; 4096];
639 while poll_read(file.as_raw_fd(), 100) {
640 match file.read(&mut buf) {
641 Ok(n) if n > 0 => all.extend_from_slice(&buf[..n]),
642 _ => break,
643 }
644 }
645 all
646 }
647
648 fn mock_respond_detect(master: &mut std::fs::File) {
650 master
652 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_DETECT, &[rnode_kiss::DETECT_RESP]))
653 .unwrap();
654 master
656 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_FW_VERSION, &[0x01, 0x4A]))
657 .unwrap();
658 master
660 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_PLATFORM, &[0x80]))
661 .unwrap();
662 master
664 .write_all(&rnode_kiss::rnode_command(rnode_kiss::CMD_MCU, &[0x01]))
665 .unwrap();
666 master.flush().unwrap();
667 }
668
669 #[test]
670 fn rnode_detect_over_pty() {
671 let (master_fd, slave_fd) = open_pty_pair().unwrap();
673 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
674 let mut slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
675
676 mock_respond_detect(&mut master);
678
679 assert!(poll_read(slave.as_raw_fd(), 2000));
681 let mut decoder = rnode_kiss::RNodeDecoder::new();
682 let mut buf = [0u8; 4096];
683 let n = slave.read(&mut buf).unwrap();
684 let events = decoder.feed(&buf[..n]);
685
686 assert!(
687 events.iter().any(|e| matches!(e, rnode_kiss::RNodeEvent::Detected(true))),
688 "should detect device"
689 );
690 assert!(
691 events.iter().any(|e| matches!(e, rnode_kiss::RNodeEvent::FirmwareVersion { .. })),
692 "should get firmware version"
693 );
694 }
695
696 #[test]
697 fn rnode_configure_commands() {
698 let (master_fd, slave_fd) = open_pty_pair().unwrap();
699 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
700 let writer_file = unsafe { std::fs::File::from_raw_fd(libc::dup(slave_fd)) };
701 let writer = Arc::new(Mutex::new(writer_file));
702
703 let sub = RNodeSubConfig {
704 name: "test".into(),
705 frequency: 868_000_000,
706 bandwidth: 125_000,
707 txpower: 7,
708 spreading_factor: 8,
709 coding_rate: 5,
710 flow_control: false,
711 st_alock: None,
712 lt_alock: None,
713 };
714
715 configure_subinterface(&writer, 0, &sub, false).unwrap();
716
717 let data = read_available(&mut master);
719
720 assert!(data.windows(2).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_FREQUENCY),
722 "should contain FREQUENCY command");
723 assert!(data.windows(2).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_BANDWIDTH),
725 "should contain BANDWIDTH command");
726 assert!(data.windows(3).any(|w| w[0] == kiss::FEND && w[1] == rnode_kiss::CMD_RADIO_STATE && w[2] == rnode_kiss::RADIO_STATE_ON),
728 "should contain RADIO_STATE ON command");
729
730 unsafe { libc::close(slave_fd) };
731 }
732
733 #[test]
734 fn rnode_data_roundtrip() {
735 let (master_fd, slave_fd) = open_pty_pair().unwrap();
736 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
737 let slave = unsafe { std::fs::File::from_raw_fd(slave_fd) };
738
739 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
741 let frame = rnode_kiss::rnode_data_frame(0, &payload);
742 master.write_all(&frame).unwrap();
743 master.flush().unwrap();
744
745 assert!(poll_read(slave.as_raw_fd(), 2000));
747 let mut decoder = rnode_kiss::RNodeDecoder::new();
748 let mut buf = [0u8; 4096];
749 let mut slave_file = slave;
750 let n = slave_file.read(&mut buf).unwrap();
751 let events = decoder.feed(&buf[..n]);
752
753 assert_eq!(events.len(), 1);
754 match &events[0] {
755 rnode_kiss::RNodeEvent::DataFrame { index, data } => {
756 assert_eq!(*index, 0);
757 assert_eq!(data, &payload);
758 }
759 other => panic!("expected DataFrame, got {:?}", other),
760 }
761 }
762
763 #[test]
764 fn rnode_flow_control() {
765 let (master_fd, slave_fd) = open_pty_pair().unwrap();
766 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
767 let shared_writer = Arc::new(Mutex::new(writer_file));
768
769 let flow_state = Arc::new(Mutex::new(SubFlowState {
770 ready: true,
771 queue: std::collections::VecDeque::new(),
772 }));
773
774 let mut sub_writer = RNodeSubWriter {
775 writer: shared_writer.clone(),
776 index: 0,
777 flow_control: true,
778 flow_state: flow_state.clone(),
779 };
780
781 sub_writer.send_frame(b"hello").unwrap();
783 assert!(!flow_state.lock().unwrap().ready);
784
785 sub_writer.send_frame(b"world").unwrap();
787 assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
788
789 process_flow_queue(&flow_state, &shared_writer, 0);
791 assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
792 assert!(!flow_state.lock().unwrap().ready); process_flow_queue(&flow_state, &shared_writer, 0);
796 assert!(flow_state.lock().unwrap().ready);
797
798 unsafe { libc::close(master_fd) };
799 }
800
801 #[test]
802 fn rnode_sub_writer_format() {
803 let (master_fd, slave_fd) = open_pty_pair().unwrap();
804 let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
805 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
806 let shared_writer = Arc::new(Mutex::new(writer_file));
807
808 let flow_state = Arc::new(Mutex::new(SubFlowState {
809 ready: true,
810 queue: std::collections::VecDeque::new(),
811 }));
812
813 let mut sub_writer = RNodeSubWriter {
814 writer: shared_writer,
815 index: 1, flow_control: false,
817 flow_state,
818 };
819
820 let payload = vec![0xAA, 0xBB, 0xCC];
821 sub_writer.send_frame(&payload).unwrap();
822
823 assert!(poll_read(master.as_raw_fd(), 2000));
825 let mut buf = [0u8; 256];
826 let n = master.read(&mut buf).unwrap();
827
828 assert_eq!(buf[0], kiss::FEND);
830 assert_eq!(buf[1], 0x10); assert_eq!(buf[n - 1], kiss::FEND);
832 }
833
834 #[test]
835 fn rnode_multi_sub_routing() {
836 let mut decoder = rnode_kiss::RNodeDecoder::new();
838
839 let payload0 = vec![0x01, 0x02];
840 let frame0 = rnode_kiss::rnode_data_frame(0, &payload0);
841 let events0 = decoder.feed(&frame0);
842 assert_eq!(events0.len(), 1);
843 assert_eq!(events0[0], rnode_kiss::RNodeEvent::DataFrame { index: 0, data: payload0 });
844
845 let payload1 = vec![0x03, 0x04];
846 let frame1 = rnode_kiss::rnode_data_frame(1, &payload1);
847 let events1 = decoder.feed(&frame1);
848 assert_eq!(events1.len(), 1);
849 assert_eq!(events1[0], rnode_kiss::RNodeEvent::DataFrame { index: 1, data: payload1 });
850 }
851
852 #[test]
853 fn rnode_error_handling() {
854 let mut decoder = rnode_kiss::RNodeDecoder::new();
862 let frame = rnode_kiss::rnode_command(rnode_kiss::CMD_ERROR, &[0x02]);
863 let events = decoder.feed(&frame);
864 assert_eq!(events.len(), 1);
865 assert_eq!(
867 events[0],
868 rnode_kiss::RNodeEvent::DataFrame {
869 index: 5,
870 data: vec![0x02]
871 }
872 );
873 }
874
875 #[test]
876 fn rnode_config_validation() {
877 let good = RNodeSubConfig {
878 name: "test".into(),
879 frequency: 868_000_000,
880 bandwidth: 125_000,
881 txpower: 7,
882 spreading_factor: 8,
883 coding_rate: 5,
884 flow_control: false,
885 st_alock: None,
886 lt_alock: None,
887 };
888 assert!(validate_sub_config(&good).is_none());
889
890 let mut bad = good.clone();
892 bad.frequency = 100;
893 assert!(validate_sub_config(&bad).is_some());
894
895 bad = good.clone();
897 bad.spreading_factor = 13;
898 assert!(validate_sub_config(&bad).is_some());
899
900 bad = good.clone();
902 bad.coding_rate = 9;
903 assert!(validate_sub_config(&bad).is_some());
904
905 bad = good.clone();
907 bad.bandwidth = 5;
908 assert!(validate_sub_config(&bad).is_some());
909
910 bad = good.clone();
912 bad.txpower = 50;
913 assert!(validate_sub_config(&bad).is_some());
914 }
915}