Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::io::{self, Read, Write};
8use std::process::{Command, Stdio};
9use std::thread;
10use std::time::Duration;
11
12use rns_core::transport::types::InterfaceId;
13
14use crate::event::{Event, EventSender};
15use crate::hdlc;
16use crate::interface::Writer;
17
18/// Configuration for a pipe interface.
19#[derive(Debug, Clone)]
20pub struct PipeConfig {
21    pub name: String,
22    pub command: String,
23    pub respawn_delay: Duration,
24    pub interface_id: InterfaceId,
25}
26
27impl Default for PipeConfig {
28    fn default() -> Self {
29        PipeConfig {
30            name: String::new(),
31            command: String::new(),
32            respawn_delay: Duration::from_secs(5),
33            interface_id: InterfaceId(0),
34        }
35    }
36}
37
38/// Writer that sends HDLC-framed data to a subprocess stdin.
39struct PipeWriter {
40    stdin: std::process::ChildStdin,
41}
42
43impl Writer for PipeWriter {
44    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
45        self.stdin.write_all(&hdlc::frame(data))
46    }
47}
48
49/// Start the pipe interface. Spawns subprocess, returns writer.
50pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
51    let id = config.interface_id;
52
53    let mut child = spawn_child(&config.command)?;
54
55    let stdout = child
56        .stdout
57        .take()
58        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
59    let stdin = child
60        .stdin
61        .take()
62        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
63
64    log::info!("[{}] pipe interface started: {}", config.name, config.command);
65
66    // Signal interface up
67    let _ = tx.send(Event::InterfaceUp(id, None, None));
68
69    // Spawn reader thread
70    thread::Builder::new()
71        .name(format!("pipe-reader-{}", id.0))
72        .spawn(move || {
73            reader_loop(stdout, child, id, config, tx);
74        })?;
75
76    Ok(Box::new(PipeWriter { stdin }))
77}
78
79fn spawn_child(command: &str) -> io::Result<std::process::Child> {
80    Command::new("sh")
81        .args(["-c", command])
82        .stdin(Stdio::piped())
83        .stdout(Stdio::piped())
84        .stderr(Stdio::null())
85        .spawn()
86}
87
88/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
89/// On subprocess exit, attempts respawn.
90fn reader_loop(
91    mut stdout: std::process::ChildStdout,
92    mut child: std::process::Child,
93    id: InterfaceId,
94    config: PipeConfig,
95    tx: EventSender,
96) {
97    let mut decoder = hdlc::Decoder::new();
98    let mut buf = [0u8; 4096];
99
100    loop {
101        match stdout.read(&mut buf) {
102            Ok(0) => {
103                // EOF — subprocess exited
104                let _ = child.wait();
105                log::warn!("[{}] subprocess terminated", config.name);
106                let _ = tx.send(Event::InterfaceDown(id));
107                match respawn(&config, &tx) {
108                    Some((new_stdout, new_child)) => {
109                        stdout = new_stdout;
110                        child = new_child;
111                        decoder = hdlc::Decoder::new();
112                        continue;
113                    }
114                    None => return,
115                }
116            }
117            Ok(n) => {
118                for frame in decoder.feed(&buf[..n]) {
119                    if tx
120                        .send(Event::Frame {
121                            interface_id: id,
122                            data: frame,
123                        })
124                        .is_err()
125                    {
126                        // Driver shut down
127                        let _ = child.kill();
128                        return;
129                    }
130                }
131            }
132            Err(e) => {
133                log::warn!("[{}] pipe read error: {}", config.name, e);
134                let _ = child.kill();
135                let _ = child.wait();
136                let _ = tx.send(Event::InterfaceDown(id));
137                match respawn(&config, &tx) {
138                    Some((new_stdout, new_child)) => {
139                        stdout = new_stdout;
140                        child = new_child;
141                        decoder = hdlc::Decoder::new();
142                        continue;
143                    }
144                    None => return,
145                }
146            }
147        }
148    }
149}
150
151/// Attempt to respawn the subprocess after a delay.
152fn respawn(
153    config: &PipeConfig,
154    tx: &EventSender,
155) -> Option<(std::process::ChildStdout, std::process::Child)> {
156    loop {
157        thread::sleep(config.respawn_delay);
158        log::info!(
159            "[{}] attempting to respawn subprocess: {}",
160            config.name,
161            config.command
162        );
163
164        match spawn_child(&config.command) {
165            Ok(mut child) => {
166                let stdout = match child.stdout.take() {
167                    Some(s) => s,
168                    None => {
169                        let _ = child.kill();
170                        let _ = child.wait();
171                        continue;
172                    }
173                };
174                let stdin = match child.stdin.take() {
175                    Some(s) => s,
176                    None => {
177                        let _ = child.kill();
178                        let _ = child.wait();
179                        continue;
180                    }
181                };
182
183                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
184                if tx
185                    .send(Event::InterfaceUp(config.interface_id, Some(new_writer), None))
186                    .is_err()
187                {
188                    return None; // Driver shut down
189                }
190                log::info!("[{}] subprocess respawned", config.name);
191                return Some((stdout, child));
192            }
193            Err(e) => {
194                log::warn!("[{}] respawn failed: {}", config.name, e);
195            }
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use std::sync::mpsc;
204
205    #[test]
206    fn pipe_start_and_receive() {
207        // Use `cat` as a loopback subprocess
208        let (tx, rx) = mpsc::channel();
209        let config = PipeConfig {
210            name: "test-pipe".into(),
211            command: "cat".into(),
212            respawn_delay: Duration::from_secs(1),
213            interface_id: InterfaceId(100),
214        };
215
216        let mut writer = start(config, tx).unwrap();
217
218        // Drain InterfaceUp
219        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
220        assert!(matches!(event, Event::InterfaceUp(InterfaceId(100), None, None)));
221
222        // Send a packet (>= 19 bytes for HDLC minimum)
223        let payload: Vec<u8> = (0..32).collect();
224        writer.send_frame(&payload).unwrap();
225
226        // Should receive Frame event (cat echos back the HDLC frame)
227        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
228        match event {
229            Event::Frame { interface_id, data } => {
230                assert_eq!(interface_id, InterfaceId(100));
231                assert_eq!(data, payload);
232            }
233            other => panic!("expected Frame, got {:?}", other),
234        }
235    }
236
237    #[test]
238    fn pipe_writer_sends() {
239        // Verify the writer wraps data in HDLC
240        let (tx, rx) = mpsc::channel();
241        let config = PipeConfig {
242            name: "test-pipe-writer".into(),
243            command: "cat".into(),
244            respawn_delay: Duration::from_secs(1),
245            interface_id: InterfaceId(101),
246        };
247
248        let mut writer = start(config, tx).unwrap();
249        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
250
251        // Write data and verify we get it back as HDLC frame
252        let payload: Vec<u8> = (10..42).collect();
253        writer.send_frame(&payload).unwrap();
254
255        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
256        match event {
257            Event::Frame { data, .. } => {
258                assert_eq!(data, payload);
259            }
260            other => panic!("expected Frame, got {:?}", other),
261        }
262    }
263
264    #[test]
265    fn pipe_subprocess_exit() {
266        // Use a command that exits immediately
267        let (tx, rx) = mpsc::channel();
268        let config = PipeConfig {
269            name: "test-pipe-exit".into(),
270            command: "true".into(), // exits immediately with 0
271            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
272            interface_id: InterfaceId(102),
273        };
274
275        let _writer = start(config, tx).unwrap();
276
277        // Should get InterfaceUp then InterfaceDown
278        let mut got_down = false;
279        for _ in 0..5 {
280            match rx.recv_timeout(Duration::from_secs(2)) {
281                Ok(Event::InterfaceDown(InterfaceId(102))) => {
282                    got_down = true;
283                    break;
284                }
285                Ok(_) => continue,
286                Err(_) => break,
287            }
288        }
289        assert!(got_down, "should receive InterfaceDown after subprocess exits");
290    }
291
292    #[test]
293    fn pipe_config_defaults() {
294        let config = PipeConfig::default();
295        assert_eq!(config.respawn_delay, Duration::from_secs(5));
296        assert_eq!(config.interface_id, InterfaceId(0));
297        assert!(config.command.is_empty());
298    }
299
300    #[test]
301    fn pipe_invalid_command() {
302        let (tx, _rx) = mpsc::channel();
303        let config = PipeConfig {
304            name: "test-pipe-bad".into(),
305            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
306            respawn_delay: Duration::from_secs(60),
307            interface_id: InterfaceId(103),
308        };
309
310        // sh -c <nonexistent> will start sh successfully but the child exits immediately
311        // For a truly invalid binary we need to check that the process fails
312        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
313        // Let's test with a different approach: verify it doesn't panic
314        let result = start(config, tx);
315        // sh will spawn successfully even if the inner command fails
316        // The real test is that it doesn't panic
317        assert!(result.is_ok());
318    }
319}