Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::thread;
11use std::time::Duration;
12
13use rns_core::transport::types::{InterfaceId, InterfaceInfo};
14
15use crate::event::{Event, EventSender};
16use crate::hdlc;
17use crate::interface::Writer;
18use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
19
20/// Configuration for a pipe interface.
21#[derive(Debug, Clone)]
22pub struct PipeConfig {
23    pub name: String,
24    pub command: String,
25    pub respawn_delay: Duration,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for PipeConfig {
30    fn default() -> Self {
31        PipeConfig {
32            name: String::new(),
33            command: String::new(),
34            respawn_delay: Duration::from_secs(5),
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data to a subprocess stdin.
41struct PipeWriter {
42    stdin: std::process::ChildStdin,
43}
44
45impl Writer for PipeWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stdin.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start the pipe interface. Spawns subprocess, returns writer.
52pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
53    let id = config.interface_id;
54
55    let mut child = spawn_child(&config.command)?;
56
57    let stdout = child
58        .stdout
59        .take()
60        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
61    let stdin = child
62        .stdin
63        .take()
64        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
65
66    log::info!("[{}] pipe interface started: {}", config.name, config.command);
67
68    // Signal interface up
69    let _ = tx.send(Event::InterfaceUp(id, None, None));
70
71    // Spawn reader thread
72    thread::Builder::new()
73        .name(format!("pipe-reader-{}", id.0))
74        .spawn(move || {
75            reader_loop(stdout, child, id, config, tx);
76        })?;
77
78    Ok(Box::new(PipeWriter { stdin }))
79}
80
81fn spawn_child(command: &str) -> io::Result<std::process::Child> {
82    Command::new("sh")
83        .args(["-c", command])
84        .stdin(Stdio::piped())
85        .stdout(Stdio::piped())
86        .stderr(Stdio::null())
87        .spawn()
88}
89
90/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
91/// On subprocess exit, attempts respawn.
92fn reader_loop(
93    mut stdout: std::process::ChildStdout,
94    mut child: std::process::Child,
95    id: InterfaceId,
96    config: PipeConfig,
97    tx: EventSender,
98) {
99    let mut decoder = hdlc::Decoder::new();
100    let mut buf = [0u8; 4096];
101
102    loop {
103        match stdout.read(&mut buf) {
104            Ok(0) => {
105                // EOF — subprocess exited
106                let _ = child.wait();
107                log::warn!("[{}] subprocess terminated", config.name);
108                let _ = tx.send(Event::InterfaceDown(id));
109                match respawn(&config, &tx) {
110                    Some((new_stdout, new_child)) => {
111                        stdout = new_stdout;
112                        child = new_child;
113                        decoder = hdlc::Decoder::new();
114                        continue;
115                    }
116                    None => return,
117                }
118            }
119            Ok(n) => {
120                for frame in decoder.feed(&buf[..n]) {
121                    if tx
122                        .send(Event::Frame {
123                            interface_id: id,
124                            data: frame,
125                        })
126                        .is_err()
127                    {
128                        // Driver shut down
129                        let _ = child.kill();
130                        return;
131                    }
132                }
133            }
134            Err(e) => {
135                log::warn!("[{}] pipe read error: {}", config.name, e);
136                let _ = child.kill();
137                let _ = child.wait();
138                let _ = tx.send(Event::InterfaceDown(id));
139                match respawn(&config, &tx) {
140                    Some((new_stdout, new_child)) => {
141                        stdout = new_stdout;
142                        child = new_child;
143                        decoder = hdlc::Decoder::new();
144                        continue;
145                    }
146                    None => return,
147                }
148            }
149        }
150    }
151}
152
153/// Attempt to respawn the subprocess after a delay.
154fn respawn(
155    config: &PipeConfig,
156    tx: &EventSender,
157) -> Option<(std::process::ChildStdout, std::process::Child)> {
158    loop {
159        thread::sleep(config.respawn_delay);
160        log::info!(
161            "[{}] attempting to respawn subprocess: {}",
162            config.name,
163            config.command
164        );
165
166        match spawn_child(&config.command) {
167            Ok(mut child) => {
168                let stdout = match child.stdout.take() {
169                    Some(s) => s,
170                    None => {
171                        let _ = child.kill();
172                        let _ = child.wait();
173                        continue;
174                    }
175                };
176                let stdin = match child.stdin.take() {
177                    Some(s) => s,
178                    None => {
179                        let _ = child.kill();
180                        let _ = child.wait();
181                        continue;
182                    }
183                };
184
185                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
186                if tx
187                    .send(Event::InterfaceUp(config.interface_id, Some(new_writer), None))
188                    .is_err()
189                {
190                    return None; // Driver shut down
191                }
192                log::info!("[{}] subprocess respawned", config.name);
193                return Some((stdout, child));
194            }
195            Err(e) => {
196                log::warn!("[{}] respawn failed: {}", config.name, e);
197            }
198        }
199    }
200}
201
202/// Factory for [`PipeInterface`] instances.
203pub struct PipeFactory;
204
205impl InterfaceFactory for PipeFactory {
206    fn type_name(&self) -> &str {
207        "PipeInterface"
208    }
209
210    fn parse_config(
211        &self,
212        name: &str,
213        id: InterfaceId,
214        params: &HashMap<String, String>,
215    ) -> Result<Box<dyn InterfaceConfigData>, String> {
216        let command = params
217            .get("command")
218            .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
219            .clone();
220
221        let respawn_delay = match params.get("respawn_delay") {
222            Some(v) => {
223                let ms: u64 = v
224                    .parse()
225                    .map_err(|_| format!("invalid respawn_delay: {}", v))?;
226                Duration::from_millis(ms)
227            }
228            None => Duration::from_secs(5),
229        };
230
231        Ok(Box::new(PipeConfig {
232            name: name.to_string(),
233            command,
234            respawn_delay,
235            interface_id: id,
236        }))
237    }
238
239    fn start(
240        &self,
241        config: Box<dyn InterfaceConfigData>,
242        ctx: StartContext,
243    ) -> io::Result<StartResult> {
244        let pipe_config = *config
245            .into_any()
246            .downcast::<PipeConfig>()
247            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
248
249        let id = pipe_config.interface_id;
250        let info = InterfaceInfo {
251            id,
252            name: pipe_config.name.clone(),
253            mode: ctx.mode,
254            out_capable: true,
255            in_capable: true,
256            bitrate: Some(1_000_000),
257            announce_rate_target: None,
258            announce_rate_grace: 0,
259            announce_rate_penalty: 0.0,
260            announce_cap: rns_core::constants::ANNOUNCE_CAP,
261            is_local_client: false,
262            wants_tunnel: false,
263            tunnel_id: None,
264            mtu: rns_core::constants::MTU as u32,
265            ingress_control: false,
266            ia_freq: 0.0,
267            started: crate::time::now(),
268        };
269
270        let writer = start(pipe_config, ctx.tx)?;
271
272        Ok(StartResult::Simple {
273            id,
274            info,
275            writer,
276            interface_type_name: "PipeInterface".to_string(),
277        })
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use std::sync::mpsc;
285
286    #[test]
287    fn pipe_start_and_receive() {
288        // Use `cat` as a loopback subprocess
289        let (tx, rx) = mpsc::channel();
290        let config = PipeConfig {
291            name: "test-pipe".into(),
292            command: "cat".into(),
293            respawn_delay: Duration::from_secs(1),
294            interface_id: InterfaceId(100),
295        };
296
297        let mut writer = start(config, tx).unwrap();
298
299        // Drain InterfaceUp
300        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
301        assert!(matches!(event, Event::InterfaceUp(InterfaceId(100), None, None)));
302
303        // Send a packet (>= 19 bytes for HDLC minimum)
304        let payload: Vec<u8> = (0..32).collect();
305        writer.send_frame(&payload).unwrap();
306
307        // Should receive Frame event (cat echos back the HDLC frame)
308        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
309        match event {
310            Event::Frame { interface_id, data } => {
311                assert_eq!(interface_id, InterfaceId(100));
312                assert_eq!(data, payload);
313            }
314            other => panic!("expected Frame, got {:?}", other),
315        }
316    }
317
318    #[test]
319    fn pipe_writer_sends() {
320        // Verify the writer wraps data in HDLC
321        let (tx, rx) = mpsc::channel();
322        let config = PipeConfig {
323            name: "test-pipe-writer".into(),
324            command: "cat".into(),
325            respawn_delay: Duration::from_secs(1),
326            interface_id: InterfaceId(101),
327        };
328
329        let mut writer = start(config, tx).unwrap();
330        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
331
332        // Write data and verify we get it back as HDLC frame
333        let payload: Vec<u8> = (10..42).collect();
334        writer.send_frame(&payload).unwrap();
335
336        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
337        match event {
338            Event::Frame { data, .. } => {
339                assert_eq!(data, payload);
340            }
341            other => panic!("expected Frame, got {:?}", other),
342        }
343    }
344
345    #[test]
346    fn pipe_subprocess_exit() {
347        // Use a command that exits immediately
348        let (tx, rx) = mpsc::channel();
349        let config = PipeConfig {
350            name: "test-pipe-exit".into(),
351            command: "true".into(), // exits immediately with 0
352            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
353            interface_id: InterfaceId(102),
354        };
355
356        let _writer = start(config, tx).unwrap();
357
358        // Should get InterfaceUp then InterfaceDown
359        let mut got_down = false;
360        for _ in 0..5 {
361            match rx.recv_timeout(Duration::from_secs(2)) {
362                Ok(Event::InterfaceDown(InterfaceId(102))) => {
363                    got_down = true;
364                    break;
365                }
366                Ok(_) => continue,
367                Err(_) => break,
368            }
369        }
370        assert!(got_down, "should receive InterfaceDown after subprocess exits");
371    }
372
373    #[test]
374    fn pipe_config_defaults() {
375        let config = PipeConfig::default();
376        assert_eq!(config.respawn_delay, Duration::from_secs(5));
377        assert_eq!(config.interface_id, InterfaceId(0));
378        assert!(config.command.is_empty());
379    }
380
381    #[test]
382    fn pipe_invalid_command() {
383        let (tx, _rx) = mpsc::channel();
384        let config = PipeConfig {
385            name: "test-pipe-bad".into(),
386            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
387            respawn_delay: Duration::from_secs(60),
388            interface_id: InterfaceId(103),
389        };
390
391        // sh -c <nonexistent> will start sh successfully but the child exits immediately
392        // For a truly invalid binary we need to check that the process fails
393        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
394        // Let's test with a different approach: verify it doesn't panic
395        let result = start(config, tx);
396        // sh will spawn successfully even if the inner command fails
397        // The real test is that it doesn't panic
398        assert!(result.is_ok());
399    }
400}