Skip to main content

rns_net/interface/
kiss_iface.rs

1//! KISS interface with flow control and TNC configuration.
2//!
3//! Matches Python `KISSInterface.py` — opens a serial port,
4//! sends TNC configuration commands, handles KISS framing with
5//! optional flow control.
6
7use std::collections::VecDeque;
8use std::io::{self, Read, Write};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13use rns_core::transport::types::InterfaceId;
14
15use crate::event::{Event, EventSender};
16use crate::interface::{lock_or_recover, Writer};
17use crate::kiss;
18use crate::serial::{Parity, SerialConfig, SerialPort};
19
20/// Configuration for a KISS interface.
21#[derive(Debug, Clone)]
22pub struct KissIfaceConfig {
23    pub name: String,
24    pub port: String,
25    pub speed: u32,
26    pub data_bits: u8,
27    pub parity: Parity,
28    pub stop_bits: u8,
29    pub preamble: u16,                // ms, default 350
30    pub txtail: u16,                  // ms, default 20
31    pub persistence: u8,              // 0-255, default 64
32    pub slottime: u16,                // ms, default 20
33    pub flow_control: bool,           // default false
34    pub beacon_interval: Option<u32>, // seconds
35    pub beacon_data: Option<Vec<u8>>, // padded to 15 bytes
36    pub interface_id: InterfaceId,
37}
38
39impl Default for KissIfaceConfig {
40    fn default() -> Self {
41        KissIfaceConfig {
42            name: String::new(),
43            port: String::new(),
44            speed: 9600,
45            data_bits: 8,
46            parity: Parity::None,
47            stop_bits: 1,
48            preamble: 350,
49            txtail: 20,
50            persistence: 64,
51            slottime: 20,
52            flow_control: false,
53            beacon_interval: None,
54            beacon_data: None,
55            interface_id: InterfaceId(0),
56        }
57    }
58}
59
60/// Shared flow-control state between writer and reader threads.
61struct FlowState {
62    ready: bool,
63    queue: VecDeque<Vec<u8>>,
64    lock_time: Instant,
65}
66
67/// Writer that sends KISS-framed data over a serial port.
68/// Handles flow control: when enabled, queues packets until CMD_READY.
69struct KissWriter {
70    file: std::fs::File,
71    flow_control: bool,
72    flow_state: Arc<Mutex<FlowState>>,
73}
74
75impl Writer for KissWriter {
76    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77        if self.flow_control {
78            let mut state = lock_or_recover(&self.flow_state, "kiss flow state");
79            if state.ready {
80                state.ready = false;
81                state.lock_time = Instant::now();
82                drop(state);
83                self.file.write_all(&kiss::frame(data))
84            } else {
85                state.queue.push_back(data.to_vec());
86                Ok(())
87            }
88        } else {
89            self.file.write_all(&kiss::frame(data))
90        }
91    }
92}
93
94/// Start the KISS interface. Opens the port, configures TNC, spawns reader thread.
95pub fn start(config: KissIfaceConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
96    let serial_config = SerialConfig {
97        path: config.port.clone(),
98        baud: config.speed,
99        data_bits: config.data_bits,
100        parity: config.parity,
101        stop_bits: config.stop_bits,
102    };
103
104    let port = SerialPort::open(&serial_config)?;
105    let reader_file = port.reader()?;
106    let mut writer_file = port.writer()?;
107    let flow_writer_file = port.writer()?;
108
109    let id = config.interface_id;
110
111    // Initial 2-second delay for TNC initialization (matches Python)
112    thread::sleep(Duration::from_secs(2));
113
114    // Signal interface up
115    let _ = tx.send(Event::InterfaceUp(id, None, None));
116
117    // Send TNC configuration commands
118    configure_tnc(&mut writer_file, &config)?;
119
120    let flow_state = Arc::new(Mutex::new(FlowState {
121        ready: true,
122        queue: VecDeque::new(),
123        lock_time: Instant::now(),
124    }));
125
126    let reader_flow_state = flow_state.clone();
127
128    // Spawn reader thread
129    let reader_config = config.clone();
130    thread::Builder::new()
131        .name(format!("kiss-reader-{}", id.0))
132        .spawn(move || {
133            reader_loop(
134                reader_file,
135                flow_writer_file,
136                id,
137                reader_config,
138                tx,
139                reader_flow_state,
140            );
141        })?;
142
143    Ok(Box::new(KissWriter {
144        file: writer_file,
145        flow_control: config.flow_control,
146        flow_state,
147    }))
148}
149
150/// Send TNC configuration commands via KISS.
151/// Matches Python `KISSInterface.configure_device()`.
152fn configure_tnc(writer: &mut std::fs::File, config: &KissIfaceConfig) -> io::Result<()> {
153    log::info!("[{}] configuring KISS interface parameters", config.name);
154
155    // Preamble: value is ms/10, clamped to 0-255
156    let preamble_val = (config.preamble / 10).min(255) as u8;
157    writer.write_all(&kiss::command_frame(kiss::CMD_TXDELAY, &[preamble_val]))?;
158
159    // TX tail: value is ms/10, clamped to 0-255
160    let txtail_val = (config.txtail / 10).min(255) as u8;
161    writer.write_all(&kiss::command_frame(kiss::CMD_TXTAIL, &[txtail_val]))?;
162
163    // Persistence: raw value, clamped to 0-255
164    writer.write_all(&kiss::command_frame(kiss::CMD_P, &[config.persistence]))?;
165
166    // Slot time: value is ms/10, clamped to 0-255
167    let slottime_val = (config.slottime / 10).min(255) as u8;
168    writer.write_all(&kiss::command_frame(kiss::CMD_SLOTTIME, &[slottime_val]))?;
169
170    // Flow control: send CMD_READY with 0x01 (matches Python setFlowControl)
171    writer.write_all(&kiss::command_frame(kiss::CMD_READY, &[0x01]))?;
172
173    log::info!("[{}] KISS interface configured", config.name);
174    Ok(())
175}
176
177/// Reader loop: reads from serial, KISS-decodes, dispatches events.
178/// Also handles flow control unlocking and beacon transmission.
179fn reader_loop(
180    mut reader: std::fs::File,
181    mut flow_writer: std::fs::File,
182    id: InterfaceId,
183    config: KissIfaceConfig,
184    tx: EventSender,
185    flow_state: Arc<Mutex<FlowState>>,
186) {
187    let mut decoder = kiss::Decoder::new();
188    let mut buf = [0u8; 4096];
189    let mut first_tx: Option<Instant> = None;
190
191    loop {
192        match reader.read(&mut buf) {
193            Ok(0) => {
194                log::warn!("[{}] KISS port closed", config.name);
195                let _ = tx.send(Event::InterfaceDown(id));
196                match reconnect(&config, &tx, &flow_state) {
197                    Some((new_reader, new_flow_writer)) => {
198                        reader = new_reader;
199                        flow_writer = new_flow_writer;
200                        decoder = kiss::Decoder::new();
201                        continue;
202                    }
203                    None => return,
204                }
205            }
206            Ok(n) => {
207                for event in decoder.feed(&buf[..n]) {
208                    match event {
209                        kiss::KissEvent::DataFrame(data) => {
210                            if tx
211                                .send(Event::Frame {
212                                    interface_id: id,
213                                    data: data,
214                                    rssi: None,
215                                    snr: None,
216                                })
217                                .is_err()
218                            {
219                                return;
220                            }
221                        }
222                        kiss::KissEvent::Ready => {
223                            process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
224                        }
225                    }
226                }
227            }
228            Err(e) => {
229                log::warn!("[{}] KISS read error: {}", config.name, e);
230                let _ = tx.send(Event::InterfaceDown(id));
231                match reconnect(&config, &tx, &flow_state) {
232                    Some((new_reader, new_flow_writer)) => {
233                        reader = new_reader;
234                        flow_writer = new_flow_writer;
235                        decoder = kiss::Decoder::new();
236                        continue;
237                    }
238                    None => return,
239                }
240            }
241        }
242
243        // Flow control timeout check
244        if config.flow_control {
245            let state = lock_or_recover(&flow_state, "kiss flow state");
246            if !state.ready && state.lock_time.elapsed() > Duration::from_secs(5) {
247                drop(state);
248                log::warn!("[{}] unlocking flow control due to timeout", config.name);
249                process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
250            }
251        }
252
253        // Beacon check
254        if let (Some(interval), Some(ref beacon_data)) =
255            (config.beacon_interval, &config.beacon_data)
256        {
257            if let Some(first) = first_tx {
258                if first.elapsed() > Duration::from_secs(interval as u64) {
259                    log::debug!("[{}] transmitting beacon data", config.name);
260                    // Pad to minimum 15 bytes
261                    let mut frame = beacon_data.clone();
262                    while frame.len() < 15 {
263                        frame.push(0x00);
264                    }
265                    let _ = flow_writer.write_all(&kiss::frame(&frame));
266                    first_tx = None;
267                }
268            }
269        }
270    }
271}
272
273/// Process the flow control queue: send next queued packet, mark ready.
274fn process_queue(
275    flow_state: &Arc<Mutex<FlowState>>,
276    writer: &mut std::fs::File,
277    first_tx: &mut Option<Instant>,
278    _config: &KissIfaceConfig,
279) {
280    let mut state = lock_or_recover(flow_state, "kiss flow state");
281    if let Some(data) = state.queue.pop_front() {
282        state.ready = false;
283        state.lock_time = Instant::now();
284        drop(state);
285        let _ = writer.write_all(&kiss::frame(&data));
286        if first_tx.is_none() {
287            *first_tx = Some(Instant::now());
288        }
289    } else {
290        state.ready = true;
291    }
292}
293
294/// Attempt to reconnect the serial port.
295fn reconnect(
296    config: &KissIfaceConfig,
297    tx: &EventSender,
298    flow_state: &Arc<Mutex<FlowState>>,
299) -> Option<(std::fs::File, std::fs::File)> {
300    loop {
301        thread::sleep(Duration::from_secs(5));
302        log::info!(
303            "[{}] attempting to reconnect KISS port {}...",
304            config.name,
305            config.port
306        );
307
308        let serial_config = SerialConfig {
309            path: config.port.clone(),
310            baud: config.speed,
311            data_bits: config.data_bits,
312            parity: config.parity,
313            stop_bits: config.stop_bits,
314        };
315
316        match SerialPort::open(&serial_config) {
317            Ok(port) => {
318                match (port.reader(), port.writer(), port.writer()) {
319                    (Ok(reader), Ok(mut cfg_writer), Ok(flow_writer)) => {
320                        // 2-second init delay
321                        thread::sleep(Duration::from_secs(2));
322                        if let Err(e) = configure_tnc(&mut cfg_writer, config) {
323                            log::warn!("[{}] TNC config failed: {}", config.name, e);
324                            continue;
325                        }
326                        // Reset flow state
327                        let mut state = lock_or_recover(flow_state, "kiss flow state");
328                        state.ready = true;
329                        state.queue.clear();
330                        drop(state);
331
332                        let new_writer: Box<dyn Writer> = Box::new(KissWriter {
333                            file: cfg_writer,
334                            flow_control: config.flow_control,
335                            flow_state: flow_state.clone(),
336                        });
337                        let _ = tx.send(Event::InterfaceUp(
338                            config.interface_id,
339                            Some(new_writer),
340                            None,
341                        ));
342                        log::info!("[{}] KISS port reconnected", config.name);
343                        return Some((reader, flow_writer));
344                    }
345                    _ => {
346                        log::warn!("[{}] failed to get handles from serial port", config.name);
347                    }
348                }
349            }
350            Err(e) => {
351                log::warn!("[{}] KISS reconnect failed: {}", config.name, e);
352            }
353        }
354    }
355}
356
357// --- Factory implementation ---
358
359use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
360use rns_core::transport::types::InterfaceInfo;
361use std::collections::HashMap;
362
363/// Factory for `KISSInterface`.
364pub struct KissFactory;
365
366impl InterfaceFactory for KissFactory {
367    fn type_name(&self) -> &str {
368        "KISSInterface"
369    }
370
371    fn default_ifac_size(&self) -> usize {
372        8
373    }
374
375    fn parse_config(
376        &self,
377        name: &str,
378        id: InterfaceId,
379        params: &HashMap<String, String>,
380    ) -> Result<Box<dyn InterfaceConfigData>, String> {
381        let port = params
382            .get("port")
383            .cloned()
384            .ok_or_else(|| "KISSInterface requires 'port'".to_string())?;
385
386        let speed = params
387            .get("speed")
388            .and_then(|v| v.parse().ok())
389            .unwrap_or(9600u32);
390
391        let data_bits = params
392            .get("databits")
393            .and_then(|v| v.parse().ok())
394            .unwrap_or(8u8);
395
396        let parity = params
397            .get("parity")
398            .map(|v| match v.to_lowercase().as_str() {
399                "e" | "even" => crate::serial::Parity::Even,
400                "o" | "odd" => crate::serial::Parity::Odd,
401                _ => crate::serial::Parity::None,
402            })
403            .unwrap_or(crate::serial::Parity::None);
404
405        let stop_bits = params
406            .get("stopbits")
407            .and_then(|v| v.parse().ok())
408            .unwrap_or(1u8);
409
410        let preamble = params
411            .get("preamble")
412            .and_then(|v| v.parse().ok())
413            .unwrap_or(350u16);
414
415        let txtail = params
416            .get("txtail")
417            .and_then(|v| v.parse().ok())
418            .unwrap_or(20u16);
419
420        let persistence = params
421            .get("persistence")
422            .and_then(|v| v.parse().ok())
423            .unwrap_or(64u8);
424
425        let slottime = params
426            .get("slottime")
427            .and_then(|v| v.parse().ok())
428            .unwrap_or(20u16);
429
430        let flow_control = params
431            .get("flow_control")
432            .and_then(|v| crate::config::parse_bool_pub(v))
433            .unwrap_or(false);
434
435        let beacon_interval = params
436            .get("id_interval")
437            .or_else(|| params.get("beacon_interval"))
438            .and_then(|v| v.parse().ok());
439
440        let beacon_data = params
441            .get("id_callsign")
442            .or_else(|| params.get("beacon_data"))
443            .map(|v| v.as_bytes().to_vec());
444
445        Ok(Box::new(KissIfaceConfig {
446            name: name.to_string(),
447            port,
448            speed,
449            data_bits,
450            parity,
451            stop_bits,
452            preamble,
453            txtail,
454            persistence,
455            slottime,
456            flow_control,
457            beacon_interval,
458            beacon_data,
459            interface_id: id,
460        }))
461    }
462
463    fn start(
464        &self,
465        config: Box<dyn InterfaceConfigData>,
466        ctx: StartContext,
467    ) -> std::io::Result<StartResult> {
468        let kiss_config = *config
469            .into_any()
470            .downcast::<KissIfaceConfig>()
471            .map_err(|_| {
472                std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
473            })?;
474
475        let id = kiss_config.interface_id;
476        let name = kiss_config.name.clone();
477
478        let info = InterfaceInfo {
479            id,
480            name,
481            mode: ctx.mode,
482            out_capable: true,
483            in_capable: true,
484            bitrate: Some(1200),
485            airtime_profile: None,
486            announce_rate_target: None,
487            announce_rate_grace: 0,
488            announce_rate_penalty: 0.0,
489            announce_cap: rns_core::constants::ANNOUNCE_CAP,
490            is_local_client: false,
491            wants_tunnel: false,
492            tunnel_id: None,
493            mtu: rns_core::constants::MTU as u32,
494            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
495            ia_freq: 0.0,
496            ip_freq: 0.0,
497            op_freq: 0.0,
498            op_samples: 0,
499            started: crate::time::now(),
500        };
501
502        let writer = start(kiss_config, ctx.tx)?;
503
504        Ok(StartResult::Simple {
505            id,
506            info,
507            writer,
508            interface_type_name: "KISSInterface".to_string(),
509        })
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::serial::open_pty_pair;
517    use std::os::unix::io::{AsRawFd, FromRawFd};
518    use std::sync::mpsc;
519
520    /// Helper: poll an fd for reading with timeout (ms).
521    fn poll_read(fd: i32, timeout_ms: i32) -> bool {
522        let mut pfd = libc::pollfd {
523            fd,
524            events: libc::POLLIN,
525            revents: 0,
526        };
527        let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
528        ret > 0
529    }
530
531    #[test]
532    fn kiss_data_roundtrip() {
533        let (master_fd, slave_fd) = open_pty_pair().unwrap();
534        let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
535        let mut slave_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
536
537        // Write a KISS data frame to the master side
538        let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
539        let framed = kiss::frame(&payload);
540        master_file.write_all(&framed).unwrap();
541        master_file.flush().unwrap();
542
543        // Read from slave using KISS decoder
544        assert!(poll_read(slave_file.as_raw_fd(), 2000));
545
546        let mut decoder = kiss::Decoder::new();
547        let mut buf = [0u8; 4096];
548        let n = slave_file.read(&mut buf).unwrap();
549        let events = decoder.feed(&buf[..n]);
550        assert_eq!(events.len(), 1);
551        assert_eq!(events[0], kiss::KissEvent::DataFrame(payload));
552    }
553
554    #[test]
555    fn kiss_writer_frames() {
556        let (master_fd, slave_fd) = open_pty_pair().unwrap();
557
558        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
559        let flow_state = Arc::new(Mutex::new(FlowState {
560            ready: true,
561            queue: VecDeque::new(),
562            lock_time: Instant::now(),
563        }));
564
565        let mut writer = KissWriter {
566            file: writer_file,
567            flow_control: false,
568            flow_state,
569        };
570
571        let payload = vec![0xC0, 0xDB, 0x01]; // includes bytes that need KISS escaping
572        writer.send_frame(&payload).unwrap();
573
574        // Read from master
575        let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
576        assert!(poll_read(master_file.as_raw_fd(), 2000));
577
578        let expected = kiss::frame(&payload);
579        let mut buf = [0u8; 256];
580        let n = master_file.read(&mut buf).unwrap();
581        assert_eq!(&buf[..n], &expected[..]);
582    }
583
584    #[test]
585    fn kiss_config_commands() {
586        use std::time::Instant;
587
588        let (master_fd, slave_fd) = open_pty_pair().unwrap();
589
590        let mut writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
591        let config = KissIfaceConfig {
592            preamble: 350,
593            txtail: 20,
594            persistence: 64,
595            slottime: 20,
596            ..Default::default()
597        };
598
599        configure_tnc(&mut writer_file, &config).unwrap();
600
601        // Read all commands from master
602        let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
603        let deadline = Instant::now() + Duration::from_secs(2);
604        let mut data = Vec::new();
605        let mut buf = [0u8; 1024];
606        while Instant::now() < deadline {
607            let remaining_ms = deadline
608                .saturating_duration_since(Instant::now())
609                .as_millis()
610                .min(i32::MAX as u128) as i32;
611            if remaining_ms <= 0 || !poll_read(master_file.as_raw_fd(), remaining_ms) {
612                break;
613            }
614
615            let n = master_file.read(&mut buf).unwrap();
616            if n == 0 {
617                break;
618            }
619            data.extend_from_slice(&buf[..n]);
620
621            let have_all = data.windows(4).any(|w| {
622                w[0] == kiss::FEND && w[1] == kiss::CMD_TXDELAY && w[2] == 35 && w[3] == kiss::FEND
623            }) && data.windows(4).any(|w| {
624                w[0] == kiss::FEND && w[1] == kiss::CMD_TXTAIL && w[2] == 2 && w[3] == kiss::FEND
625            }) && data.windows(4).any(|w| {
626                w[0] == kiss::FEND && w[1] == kiss::CMD_P && w[2] == 64 && w[3] == kiss::FEND
627            }) && data.windows(4).any(|w| {
628                w[0] == kiss::FEND && w[1] == kiss::CMD_SLOTTIME && w[2] == 2 && w[3] == kiss::FEND
629            });
630            if have_all {
631                break;
632            }
633        }
634
635        // Should contain TXDELAY command: FEND, CMD_TXDELAY, value, FEND
636        // preamble: 350/10 = 35
637        assert!(
638            data.windows(4).any(|w| w[0] == kiss::FEND
639                && w[1] == kiss::CMD_TXDELAY
640                && w[2] == 35
641                && w[3] == kiss::FEND),
642            "should contain TXDELAY command"
643        );
644
645        // TXTAIL: 20/10 = 2
646        assert!(
647            data.windows(4).any(|w| w[0] == kiss::FEND
648                && w[1] == kiss::CMD_TXTAIL
649                && w[2] == 2
650                && w[3] == kiss::FEND),
651            "should contain TXTAIL command"
652        );
653
654        // Persistence: 64
655        assert!(
656            data.windows(4).any(|w| w[0] == kiss::FEND
657                && w[1] == kiss::CMD_P
658                && w[2] == 64
659                && w[3] == kiss::FEND),
660            "should contain P command"
661        );
662
663        // Slottime: 20/10 = 2
664        assert!(
665            data.windows(4).any(|w| w[0] == kiss::FEND
666                && w[1] == kiss::CMD_SLOTTIME
667                && w[2] == 2
668                && w[3] == kiss::FEND),
669            "should contain SLOTTIME command"
670        );
671    }
672
673    #[test]
674    fn kiss_flow_control_lock() {
675        let flow_state = Arc::new(Mutex::new(FlowState {
676            ready: true,
677            queue: VecDeque::new(),
678            lock_time: Instant::now(),
679        }));
680
681        let (master_fd, slave_fd) = open_pty_pair().unwrap();
682        let writer_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
683
684        let mut writer = KissWriter {
685            file: writer_file,
686            flow_control: true,
687            flow_state: flow_state.clone(),
688        };
689
690        // First send should go through (ready=true) and lock
691        writer.send_frame(b"hello").unwrap();
692        assert!(!flow_state.lock().unwrap().ready);
693
694        // Second send should be queued (ready=false)
695        writer.send_frame(b"world").unwrap();
696        assert_eq!(flow_state.lock().unwrap().queue.len(), 1);
697
698        // Simulate CMD_READY: process_queue
699        let mut flow_writer = unsafe { std::fs::File::from_raw_fd(libc::dup(master_fd)) };
700        let mut first_tx = None;
701        let config = KissIfaceConfig::default();
702        process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
703
704        // Queue should be empty now (dequeued "world"), but ready=false because it sent
705        assert_eq!(flow_state.lock().unwrap().queue.len(), 0);
706        assert!(!flow_state.lock().unwrap().ready);
707
708        // Process again with empty queue: should set ready=true
709        process_queue(&flow_state, &mut flow_writer, &mut first_tx, &config);
710        assert!(flow_state.lock().unwrap().ready);
711
712        // Clean up
713        unsafe { libc::close(master_fd) };
714    }
715
716    #[test]
717    fn kiss_flow_control_timeout() {
718        let flow_state = Arc::new(Mutex::new(FlowState {
719            ready: false,
720            queue: VecDeque::new(),
721            lock_time: Instant::now() - Duration::from_secs(6), // already timed out
722        }));
723
724        // Check that the timeout condition triggers
725        let state = flow_state.lock().unwrap();
726        assert!(!state.ready);
727        assert!(state.lock_time.elapsed() > Duration::from_secs(5));
728    }
729
730    #[test]
731    fn kiss_fragmented() {
732        let (master_fd, slave_fd) = open_pty_pair().unwrap();
733        let mut master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
734        let slave_file = unsafe { std::fs::File::from_raw_fd(slave_fd) };
735
736        let payload = vec![0x01, 0x02, 0x03, 0x04, 0x05];
737        let framed = kiss::frame(&payload);
738        let mid = framed.len() / 2;
739
740        // Spawn reader thread first (it will block waiting for data)
741        let (tx, rx) = mpsc::channel::<kiss::KissEvent>();
742        let reader_thread = thread::spawn(move || {
743            let mut reader = slave_file;
744            let mut decoder = kiss::Decoder::new();
745            let mut buf = [0u8; 4096];
746
747            loop {
748                match reader.read(&mut buf) {
749                    Ok(n) if n > 0 => {
750                        for event in decoder.feed(&buf[..n]) {
751                            let _ = tx.send(event.clone());
752                            if matches!(event, kiss::KissEvent::DataFrame(_)) {
753                                return;
754                            }
755                        }
756                    }
757                    _ => return,
758                }
759            }
760        });
761
762        // Write first half
763        master_file.write_all(&framed[..mid]).unwrap();
764        master_file.flush().unwrap();
765
766        thread::sleep(Duration::from_millis(50));
767
768        // Write second half
769        master_file.write_all(&framed[mid..]).unwrap();
770        master_file.flush().unwrap();
771
772        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
773        assert_eq!(event, kiss::KissEvent::DataFrame(payload));
774
775        let _ = reader_thread.join();
776    }
777}