1use std::collections::VecDeque;
8use std::io::{self, Read, Write};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13use rns_core::transport::types::InterfaceId;
14
15use crate::event::{Event, EventSender};
16use crate::interface::{lock_or_recover, Writer};
17use crate::kiss;
18use crate::serial::{Parity, SerialConfig, SerialPort};
19
20#[derive(Debug, Clone)]
22pub struct KissIfaceConfig {
23 pub name: String,
24 pub port: String,
25 pub speed: u32,
26 pub data_bits: u8,
27 pub parity: Parity,
28 pub stop_bits: u8,
29 pub preamble: u16, pub txtail: u16, pub persistence: u8, pub slottime: u16, pub flow_control: bool, pub beacon_interval: Option<u32>, pub beacon_data: Option<Vec<u8>>, pub interface_id: InterfaceId,
37}
38
39impl Default for KissIfaceConfig {
40 fn default() -> Self {
41 KissIfaceConfig {
42 name: String::new(),
43 port: String::new(),
44 speed: 9600,
45 data_bits: 8,
46 parity: Parity::None,
47 stop_bits: 1,
48 preamble: 350,
49 txtail: 20,
50 persistence: 64,
51 slottime: 20,
52 flow_control: false,
53 beacon_interval: None,
54 beacon_data: None,
55 interface_id: InterfaceId(0),
56 }
57 }
58}
59
60struct FlowState {
62 ready: bool,
63 queue: VecDeque<Vec<u8>>,
64 lock_time: Instant,
65}
66
67struct KissWriter {
70 file: std::fs::File,
71 flow_control: bool,
72 flow_state: Arc<Mutex<FlowState>>,
73}
74
75impl Writer for KissWriter {
76 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77 if self.flow_control {
78 let mut state = lock_or_recover(&self.flow_state, "kiss flow state");
79 if state.ready {
80 state.ready = false;
81 state.lock_time = Instant::now();
82 drop(state);
83 self.file.write_all(&kiss::frame(data))
84 } else {
85 state.queue.push_back(data.to_vec());
86 Ok(())
87 }
88 } else {
89 self.file.write_all(&kiss::frame(data))
90 }
91 }
92}
93
94pub fn start(config: KissIfaceConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
96 let serial_config = SerialConfig {
97 path: config.port.clone(),
98 baud: config.speed,
99 data_bits: config.data_bits,
100 parity: config.parity,
101 stop_bits: config.stop_bits,
102 };
103
104 let port = SerialPort::open(&serial_config)?;
105 let reader_file = port.reader()?;
106 let mut writer_file = port.writer()?;
107 let flow_writer_file = port.writer()?;
108
109 let id = config.interface_id;
110
111 thread::sleep(Duration::from_secs(2));
113
114 let _ = tx.send(Event::InterfaceUp(id, None, None));
116
117 configure_tnc(&mut writer_file, &config)?;
119
120 let flow_state = Arc::new(Mutex::new(FlowState {
121 ready: true,
122 queue: VecDeque::new(),
123 lock_time: Instant::now(),
124 }));
125
126 let reader_flow_state = flow_state.clone();
127
128 let reader_config = config.clone();
130 thread::Builder::new()
131 .name(format!("kiss-reader-{}", id.0))
132 .spawn(move || {
133 reader_loop(
134 reader_file,
135 flow_writer_file,
136 id,
137 reader_config,
138 tx,
139 reader_flow_state,
140 );
141 })?;
142
143 Ok(Box::new(KissWriter {
144 file: writer_file,
145 flow_control: config.flow_control,
146 flow_state,
147 }))
148}
149
150fn configure_tnc(writer: &mut std::fs::File, config: &KissIfaceConfig) -> io::Result<()> {
153 log::info!("[{}] configuring KISS interface parameters", config.name);
154
155 let preamble_val = (config.preamble / 10).min(255) as u8;
157 writer.write_all(&kiss::command_frame(kiss::CMD_TXDELAY, &[preamble_val]))?;
158
159 let txtail_val = (config.txtail / 10).min(255) as u8;
161 writer.write_all(&kiss::command_frame(kiss::CMD_TXTAIL, &[txtail_val]))?;
162
163 writer.write_all(&kiss::command_frame(kiss::CMD_P, &[config.persistence]))?;
165
166 let slottime_val = (config.slottime / 10).min(255) as u8;
168 writer.write_all(&kiss::command_frame(kiss::CMD_SLOTTIME, &[slottime_val]))?;
169
170 writer.write_all(&kiss::command_frame(kiss::CMD_READY, &[0x01]))?;
172
173 log::info!("[{}] KISS interface configured", config.name);
174 Ok(())
175}
176
177fn reader_loop(
180 mut reader: std::fs::File,
181 mut flow_writer: std::fs::File,
182 id: InterfaceId,
183 config: KissIfaceConfig,
184 tx: EventSender,
185 flow_state: Arc<Mutex<FlowState>>,
186) {
187 let mut decoder = kiss::Decoder::new();
188 let mut buf = [0u8; 4096];
189 let mut first_tx: Option<Instant> = None;
190
191 loop {
192 match reader.read(&mut buf) {
193 Ok(0) => {
194 log::warn!("[{}] KISS port closed", config.name);
195 let _ = tx.send(Event::InterfaceDown(id));
196 match reconnect(&config, &tx, &flow_state) {
197 Some((new_reader, new_flow_writer)) => {
198 reader = new_reader;
199 flow_writer = new_flow_writer;
200 decoder = kiss::Decoder::new();
201 continue;
202 }
203 None => return,
204 }
205 }
206 Ok(n) => {
207 for event in decoder.feed(&buf[..n]) {
208 match event {
209 kiss::KissEvent::DataFrame(data) => {
210 if tx
211 .send(Event::Frame {
212 interface_id: id,
213 data: data,
214 rssi: None,
215 snr: None,
216 })
217 .is_err()
218 {
219 return;
220 }
221 }
222 kiss::KissEvent::Ready => {
223 process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
224 }
225 }
226 }
227 }
228 Err(e) => {
229 log::warn!("[{}] KISS read error: {}", config.name, e);
230 let _ = tx.send(Event::InterfaceDown(id));
231 match reconnect(&config, &tx, &flow_state) {
232 Some((new_reader, new_flow_writer)) => {
233 reader = new_reader;
234 flow_writer = new_flow_writer;
235 decoder = kiss::Decoder::new();
236 continue;
237 }
238 None => return,
239 }
240 }
241 }
242
243 if config.flow_control {
245 let state = lock_or_recover(&flow_state, "kiss flow state");
246 if !state.ready && state.lock_time.elapsed() > Duration::from_secs(5) {
247 drop(state);
248 log::warn!("[{}] unlocking flow control due to timeout", config.name);
249 process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
250 }
251 }
252
253 if let (Some(interval), Some(ref beacon_data)) =
255 (config.beacon_interval, &config.beacon_data)
256 {
257 if let Some(first) = first_tx {
258 if first.elapsed() > Duration::from_secs(interval as u64) {
259 log::debug!("[{}] transmitting beacon data", config.name);
260 let mut frame = beacon_data.clone();
262 while frame.len() < 15 {
263 frame.push(0x00);
264 }
265 let _ = flow_writer.write_all(&kiss::frame(&frame));
266 first_tx = None;
267 }
268 }
269 }
270 }
271}
272
273fn process_queue(
275 flow_state: &Arc<Mutex<FlowState>>,
276 writer: &mut std::fs::File,
277 first_tx: &mut Option<Instant>,
278 _config: &KissIfaceConfig,
279) {
280 let mut state = lock_or_recover(flow_state, "kiss flow state");
281 if let Some(data) = state.queue.pop_front() {
282 state.ready = false;
283 state.lock_time = Instant::now();
284 drop(state);
285 let _ = writer.write_all(&kiss::frame(&data));
286 if first_tx.is_none() {
287 *first_tx = Some(Instant::now());
288 }
289 } else {
290 state.ready = true;
291 }
292}
293
294fn reconnect(
296 config: &KissIfaceConfig,
297 tx: &EventSender,
298 flow_state: &Arc<Mutex<FlowState>>,
299) -> Option<(std::fs::File, std::fs::File)> {
300 loop {
301 thread::sleep(Duration::from_secs(5));
302 log::info!(
303 "[{}] attempting to reconnect KISS port {}...",
304 config.name,
305 config.port
306 );
307
308 let serial_config = SerialConfig {
309 path: config.port.clone(),
310 baud: config.speed,
311 data_bits: config.data_bits,
312 parity: config.parity,
313 stop_bits: config.stop_bits,
314 };
315
316 match SerialPort::open(&serial_config) {
317 Ok(port) => {
318 match (port.reader(), port.writer(), port.writer()) {
319 (Ok(reader), Ok(mut cfg_writer), Ok(flow_writer)) => {
320 thread::sleep(Duration::from_secs(2));
322 if let Err(e) = configure_tnc(&mut cfg_writer, config) {
323 log::warn!("[{}] TNC config failed: {}", config.name, e);
324 continue;
325 }
326 let mut state = lock_or_recover(flow_state, "kiss flow state");
328 state.ready = true;
329 state.queue.clear();
330 drop(state);
331
332 let new_writer: Box<dyn Writer> = Box::new(KissWriter {
333 file: cfg_writer,
334 flow_control: config.flow_control,
335 flow_state: flow_state.clone(),
336 });
337 let _ = tx.send(Event::InterfaceUp(
338 config.interface_id,
339 Some(new_writer),
340 None,
341 ));
342 log::info!("[{}] KISS port reconnected", config.name);
343 return Some((reader, flow_writer));
344 }
345 _ => {
346 log::warn!("[{}] failed to get handles from serial port", config.name);
347 }
348 }
349 }
350 Err(e) => {
351 log::warn!("[{}] KISS reconnect failed: {}", config.name, e);
352 }
353 }
354 }
355}
356
357use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
360use rns_core::transport::types::InterfaceInfo;
361use std::collections::HashMap;
362
363pub struct KissFactory;
365
366impl InterfaceFactory for KissFactory {
367 fn type_name(&self) -> &str {
368 "KISSInterface"
369 }
370
371 fn default_ifac_size(&self) -> usize {
372 8
373 }
374
375 fn parse_config(
376 &self,
377 name: &str,
378 id: InterfaceId,
379 params: &HashMap<String, String>,
380 ) -> Result<Box<dyn InterfaceConfigData>, String> {
381 let port = params
382 .get("port")
383 .cloned()
384 .ok_or_else(|| "KISSInterface requires 'port'".to_string())?;
385
386 let speed = params
387 .get("speed")
388 .and_then(|v| v.parse().ok())
389 .unwrap_or(9600u32);
390
391 let data_bits = params
392 .get("databits")
393 .and_then(|v| v.parse().ok())
394 .unwrap_or(8u8);
395
396 let parity = params
397 .get("parity")
398 .map(|v| match v.to_lowercase().as_str() {
399 "e" | "even" => crate::serial::Parity::Even,
400 "o" | "odd" => crate::serial::Parity::Odd,
401 _ => crate::serial::Parity::None,
402 })
403 .unwrap_or(crate::serial::Parity::None);
404
405 let stop_bits = params
406 .get("stopbits")
407 .and_then(|v| v.parse().ok())
408 .unwrap_or(1u8);
409
410 let preamble = params
411 .get("preamble")
412 .and_then(|v| v.parse().ok())
413 .unwrap_or(350u16);
414
415 let txtail = params
416 .get("txtail")
417 .and_then(|v| v.parse().ok())
418 .unwrap_or(20u16);
419
420 let persistence = params
421 .get("persistence")
422 .and_then(|v| v.parse().ok())
423 .unwrap_or(64u8);
424
425 let slottime = params
426 .get("slottime")
427 .and_then(|v| v.parse().ok())
428 .unwrap_or(20u16);
429
430 let flow_control = params
431 .get("flow_control")
432 .and_then(|v| crate::config::parse_bool_pub(v))
433 .unwrap_or(false);
434
435 let beacon_interval = params
436 .get("id_interval")
437 .or_else(|| params.get("beacon_interval"))
438 .and_then(|v| v.parse().ok());
439
440 let beacon_data = params
441 .get("id_callsign")
442 .or_else(|| params.get("beacon_data"))
443 .map(|v| v.as_bytes().to_vec());
444
445 Ok(Box::new(KissIfaceConfig {
446 name: name.to_string(),
447 port,
448 speed,
449 data_bits,
450 parity,
451 stop_bits,
452 preamble,
453 txtail,
454 persistence,
455 slottime,
456 flow_control,
457 beacon_interval,
458 beacon_data,
459 interface_id: id,
460 }))
461 }
462
463 fn start(
464 &self,
465 config: Box<dyn InterfaceConfigData>,
466 ctx: StartContext,
467 ) -> std::io::Result<StartResult> {
468 let kiss_config = *config
469 .into_any()
470 .downcast::<KissIfaceConfig>()
471 .map_err(|_| {
472 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
473 })?;
474
475 let id = kiss_config.interface_id;
476 let name = kiss_config.name.clone();
477
478 let info = InterfaceInfo {
479 id,
480 name,
481 mode: ctx.mode,
482 out_capable: true,
483 in_capable: true,
484 bitrate: Some(1200),
485 airtime_profile: None,
486 announce_rate_target: None,
487 announce_rate_grace: 0,
488 announce_rate_penalty: 0.0,
489 announce_cap: rns_core::constants::ANNOUNCE_CAP,
490 is_local_client: false,
491 wants_tunnel: false,
492 tunnel_id: None,
493 mtu: rns_core::constants::MTU as u32,
494 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
495 ia_freq: 0.0,
496 ip_freq: 0.0,
497 op_freq: 0.0,
498 op_samples: 0,
499 started: crate::time::now(),
500 };
501
502 let writer = start(kiss_config, ctx.tx)?;
503
504 Ok(StartResult::Simple {
505 id,
506 info,
507 writer,
508 interface_type_name: "KISSInterface".to_string(),
509 })
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::serial::open_pty_pair;
517 use std::os::unix::io::{AsRawFd, FromRawFd};
518 use std::sync::mpsc;
519
520 fn poll_read(fd: i32, timeout_ms: i32) -> bool {
522 let mut pfd = libc::pollfd {
523 fd,
524 events: libc::POLLIN,
525 revents: 0,
526 };
527 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
528 ret > 0
529 }
530
531 #[test]
532 fn kiss_data_roundtrip() {
533 let (master_fd, slave_fd) = open_pty_pair().unwrap();
534 let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
535 let mut slave_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
536
537 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
539 let framed = kiss::frame(&payload);
540 master_file.write_all(&framed).unwrap();
541 master_file.flush().unwrap();
542
543 assert!(poll_read(slave_file.as_raw_fd(), 2000));
545
546 let mut decoder = kiss::Decoder::new();
547 let mut buf = [0u8; 4096];
548 let n = slave_file.read(&mut buf).unwrap();
549 let events = decoder.feed(&buf[..n]);
550 assert_eq!(events.len(), 1);
551 assert_eq!(events[0], kiss::KissEvent::DataFrame(payload));
552 }
553
554 #[test]
555 fn kiss_writer_frames() {
556 let (master_fd, slave_fd) = open_pty_pair().unwrap();
557
558 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
559 let flow_state = Arc::new(Mutex::new(FlowState {
560 ready: true,
561 queue: VecDeque::new(),
562 lock_time: Instant::now(),
563 }));
564
565 let mut writer = KissWriter {
566 file: writer_file,
567 flow_control: false,
568 flow_state,
569 };
570
571 let payload = vec![0xC0, 0xDB, 0x01]; writer.send_frame(&payload).unwrap();
573
574 let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
576 assert!(poll_read(master_file.as_raw_fd(), 2000));
577
578 let expected = kiss::frame(&payload);
579 let mut buf = [0u8; 256];
580 let n = master_file.read(&mut buf).unwrap();
581 assert_eq!(&buf[..n], &expected[..]);
582 }
583
584 #[test]
585 fn kiss_config_commands() {
586 use std::time::Instant;
587
588 let (master_fd, slave_fd) = open_pty_pair().unwrap();
589
590 let mut writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
591 let config = KissIfaceConfig {
592 preamble: 350,
593 txtail: 20,
594 persistence: 64,
595 slottime: 20,
596 ..Default::default()
597 };
598
599 configure_tnc(&mut writer_file, &config).unwrap();
600
601 let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
603 let deadline = Instant::now() + Duration::from_secs(2);
604 let mut data = Vec::new();
605 let mut buf = [0u8; 1024];
606 while Instant::now() < deadline {
607 let remaining_ms = deadline
608 .saturating_duration_since(Instant::now())
609 .as_millis()
610 .min(i32::MAX as u128) as i32;
611 if remaining_ms <= 0 || !poll_read(master_file.as_raw_fd(), remaining_ms) {
612 break;
613 }
614
615 let n = master_file.read(&mut buf).unwrap();
616 if n == 0 {
617 break;
618 }
619 data.extend_from_slice(&buf[..n]);
620
621 let have_all = data.windows(4).any(|w| {
622 w[0] == kiss::FEND && w[1] == kiss::CMD_TXDELAY && w[2] == 35 && w[3] == kiss::FEND
623 }) && data.windows(4).any(|w| {
624 w[0] == kiss::FEND && w[1] == kiss::CMD_TXTAIL && w[2] == 2 && w[3] == kiss::FEND
625 }) && data.windows(4).any(|w| {
626 w[0] == kiss::FEND && w[1] == kiss::CMD_P && w[2] == 64 && w[3] == kiss::FEND
627 }) && data.windows(4).any(|w| {
628 w[0] == kiss::FEND && w[1] == kiss::CMD_SLOTTIME && w[2] == 2 && w[3] == kiss::FEND
629 });
630 if have_all {
631 break;
632 }
633 }
634
635 assert!(
638 data.windows(4).any(|w| w[0] == kiss::FEND
639 && w[1] == kiss::CMD_TXDELAY
640 && w[2] == 35
641 && w[3] == kiss::FEND),
642 "should contain TXDELAY command"
643 );
644
645 assert!(
647 data.windows(4).any(|w| w[0] == kiss::FEND
648 && w[1] == kiss::CMD_TXTAIL
649 && w[2] == 2
650 && w[3] == kiss::FEND),
651 "should contain TXTAIL command"
652 );
653
654 assert!(
656 data.windows(4).any(|w| w[0] == kiss::FEND
657 && w[1] == kiss::CMD_P
658 && w[2] == 64
659 && w[3] == kiss::FEND),
660 "should contain P command"
661 );
662
663 assert!(
665 data.windows(4).any(|w| w[0] == kiss::FEND
666 && w[1] == kiss::CMD_SLOTTIME
667 && w[2] == 2
668 && w[3] == kiss::FEND),
669 "should contain SLOTTIME command"
670 );
671 }
672
673 #[test]
674 fn kiss_flow_control_lock() {
675 let flow_state = Arc::new(Mutex::new(FlowState {
676 ready: true,
677 queue: VecDeque::new(),
678 lock_time: Instant::now(),
679 }));
680
681 let (master_fd, slave_fd) = open_pty_pair().unwrap();
682 let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
683
684 let mut writer = KissWriter {
685 file: writer_file,
686 flow_control: true,
687 flow_state: flow_state.clone(),
688 };
689
690 writer.send_frame(b"hello").unwrap();
692 assert!(!flow_state.lock().unwrap().ready);
693
694 writer.send_frame(b"world").unwrap();
696 assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
697
698 let mut flow_writer = unsafe { std::fs::File::from_raw_fd(libc::dup(master_fd)) };
700 let mut first_tx = None;
701 let config = KissIfaceConfig::default();
702 process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
703
704 assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
706 assert!(!flow_state.lock().unwrap().ready);
707
708 process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
710 assert!(flow_state.lock().unwrap().ready);
711
712 unsafe { libc::close(master_fd) };
714 }
715
716 #[test]
717 fn kiss_flow_control_timeout() {
718 let flow_state = Arc::new(Mutex::new(FlowState {
719 ready: false,
720 queue: VecDeque::new(),
721 lock_time: Instant::now() - Duration::from_secs(6), }));
723
724 let state = flow_state.lock().unwrap();
726 assert!(!state.ready);
727 assert!(state.lock_time.elapsed() > Duration::from_secs(5));
728 }
729
730 #[test]
731 fn kiss_fragmented() {
732 let (master_fd, slave_fd) = open_pty_pair().unwrap();
733 let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
734 let slave_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
735
736 let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
737 let framed = kiss::frame(&payload);
738 let mid = framed.len() / 2;
739
740 let (tx, rx) = mpsc::channel::<kiss::KissEvent>();
742 let reader_thread = thread::spawn(move || {
743 let mut reader = slave_file;
744 let mut decoder = kiss::Decoder::new();
745 let mut buf = [0u8; 4096];
746
747 loop {
748 match reader.read(&mut buf) {
749 Ok(n) if n > 0 => {
750 for event in decoder.feed(&buf[..n]) {
751 let _ = tx.send(event.clone());
752 if matches!(event, kiss::KissEvent::DataFrame(_)) {
753 return;
754 }
755 }
756 }
757 _ => return,
758 }
759 }
760 });
761
762 master_file.write_all(&framed[..mid]).unwrap();
764 master_file.flush().unwrap();
765
766 thread::sleep(Duration::from_millis(50));
767
768 master_file.write_all(&framed[mid..]).unwrap();
770 master_file.flush().unwrap();
771
772 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
773 assert_eq!(event, kiss::KissEvent::DataFrame(payload));
774
775 let _ = reader_thread.join();
776 }
777}