Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::thread;
11use std::time::Duration;
12
13use rns_core::transport::types::{InterfaceId, InterfaceInfo};
14
15use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20/// Configuration for a pipe interface.
21#[derive(Debug, Clone)]
22pub struct PipeConfig {
23    pub name: String,
24    pub command: String,
25    pub respawn_delay: Duration,
26    pub interface_id: InterfaceId,
27}
28
29impl Default for PipeConfig {
30    fn default() -> Self {
31        PipeConfig {
32            name: String::new(),
33            command: String::new(),
34            respawn_delay: Duration::from_secs(5),
35            interface_id: InterfaceId(0),
36        }
37    }
38}
39
40/// Writer that sends HDLC-framed data to a subprocess stdin.
41struct PipeWriter {
42    stdin: std::process::ChildStdin,
43}
44
45impl Writer for PipeWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.stdin.write_all(&hdlc::frame(data))
48    }
49}
50
51/// Start the pipe interface. Spawns subprocess, returns writer.
52pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
53    let id = config.interface_id;
54
55    let mut child = spawn_child(&config.command)?;
56
57    let stdout = child
58        .stdout
59        .take()
60        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
61    let stdin = child
62        .stdin
63        .take()
64        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
65
66    log::info!(
67        "[{}] pipe interface started: {}",
68        config.name,
69        config.command
70    );
71
72    // Signal interface up
73    let _ = tx.send(Event::InterfaceUp(id, None, None));
74
75    // Spawn reader thread
76    thread::Builder::new()
77        .name(format!("pipe-reader-{}", id.0))
78        .spawn(move || {
79            reader_loop(stdout, child, id, config, tx);
80        })?;
81
82    Ok(Box::new(PipeWriter { stdin }))
83}
84
85fn spawn_child(command: &str) -> io::Result<std::process::Child> {
86    Command::new("sh")
87        .args(["-c", command])
88        .stdin(Stdio::piped())
89        .stdout(Stdio::piped())
90        .stderr(Stdio::null())
91        .spawn()
92}
93
94/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
95/// On subprocess exit, attempts respawn.
96fn reader_loop(
97    mut stdout: std::process::ChildStdout,
98    mut child: std::process::Child,
99    id: InterfaceId,
100    config: PipeConfig,
101    tx: EventSender,
102) {
103    let mut decoder = hdlc::Decoder::new();
104    let mut buf = [0u8; 4096];
105
106    loop {
107        match stdout.read(&mut buf) {
108            Ok(0) => {
109                // EOF — subprocess exited
110                let _ = child.wait();
111                log::warn!("[{}] subprocess terminated", config.name);
112                let _ = tx.send(Event::InterfaceDown(id));
113                match respawn(&config, &tx) {
114                    Some((new_stdout, new_child)) => {
115                        stdout = new_stdout;
116                        child = new_child;
117                        decoder = hdlc::Decoder::new();
118                        continue;
119                    }
120                    None => return,
121                }
122            }
123            Ok(n) => {
124                for frame in decoder.feed(&buf[..n]) {
125                    if tx
126                        .send(Event::Frame {
127                            interface_id: id,
128                            data: frame,
129                        })
130                        .is_err()
131                    {
132                        // Driver shut down
133                        let _ = child.kill();
134                        return;
135                    }
136                }
137            }
138            Err(e) => {
139                log::warn!("[{}] pipe read error: {}", config.name, e);
140                let _ = child.kill();
141                let _ = child.wait();
142                let _ = tx.send(Event::InterfaceDown(id));
143                match respawn(&config, &tx) {
144                    Some((new_stdout, new_child)) => {
145                        stdout = new_stdout;
146                        child = new_child;
147                        decoder = hdlc::Decoder::new();
148                        continue;
149                    }
150                    None => return,
151                }
152            }
153        }
154    }
155}
156
157/// Attempt to respawn the subprocess after a delay.
158fn respawn(
159    config: &PipeConfig,
160    tx: &EventSender,
161) -> Option<(std::process::ChildStdout, std::process::Child)> {
162    loop {
163        thread::sleep(config.respawn_delay);
164        log::info!(
165            "[{}] attempting to respawn subprocess: {}",
166            config.name,
167            config.command
168        );
169
170        match spawn_child(&config.command) {
171            Ok(mut child) => {
172                let stdout = match child.stdout.take() {
173                    Some(s) => s,
174                    None => {
175                        let _ = child.kill();
176                        let _ = child.wait();
177                        continue;
178                    }
179                };
180                let stdin = match child.stdin.take() {
181                    Some(s) => s,
182                    None => {
183                        let _ = child.kill();
184                        let _ = child.wait();
185                        continue;
186                    }
187                };
188
189                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
190                if tx
191                    .send(Event::InterfaceUp(
192                        config.interface_id,
193                        Some(new_writer),
194                        None,
195                    ))
196                    .is_err()
197                {
198                    return None; // Driver shut down
199                }
200                log::info!("[{}] subprocess respawned", config.name);
201                return Some((stdout, child));
202            }
203            Err(e) => {
204                log::warn!("[{}] respawn failed: {}", config.name, e);
205            }
206        }
207    }
208}
209
210/// Factory for [`PipeInterface`] instances.
211pub struct PipeFactory;
212
213impl InterfaceFactory for PipeFactory {
214    fn type_name(&self) -> &str {
215        "PipeInterface"
216    }
217
218    fn parse_config(
219        &self,
220        name: &str,
221        id: InterfaceId,
222        params: &HashMap<String, String>,
223    ) -> Result<Box<dyn InterfaceConfigData>, String> {
224        let command = params
225            .get("command")
226            .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
227            .clone();
228
229        let respawn_delay = match params.get("respawn_delay") {
230            Some(v) => {
231                let ms: u64 = v
232                    .parse()
233                    .map_err(|_| format!("invalid respawn_delay: {}", v))?;
234                Duration::from_millis(ms)
235            }
236            None => Duration::from_secs(5),
237        };
238
239        Ok(Box::new(PipeConfig {
240            name: name.to_string(),
241            command,
242            respawn_delay,
243            interface_id: id,
244        }))
245    }
246
247    fn start(
248        &self,
249        config: Box<dyn InterfaceConfigData>,
250        ctx: StartContext,
251    ) -> io::Result<StartResult> {
252        let pipe_config = *config
253            .into_any()
254            .downcast::<PipeConfig>()
255            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
256
257        let id = pipe_config.interface_id;
258        let info = InterfaceInfo {
259            id,
260            name: pipe_config.name.clone(),
261            mode: ctx.mode,
262            out_capable: true,
263            in_capable: true,
264            bitrate: Some(1_000_000),
265            announce_rate_target: None,
266            announce_rate_grace: 0,
267            announce_rate_penalty: 0.0,
268            announce_cap: rns_core::constants::ANNOUNCE_CAP,
269            is_local_client: false,
270            wants_tunnel: false,
271            tunnel_id: None,
272            mtu: rns_core::constants::MTU as u32,
273            ingress_control: false,
274            ia_freq: 0.0,
275            started: crate::time::now(),
276        };
277
278        let writer = start(pipe_config, ctx.tx)?;
279
280        Ok(StartResult::Simple {
281            id,
282            info,
283            writer,
284            interface_type_name: "PipeInterface".to_string(),
285        })
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use std::sync::mpsc;
293
294    #[test]
295    fn pipe_start_and_receive() {
296        // Use `cat` as a loopback subprocess
297        let (tx, rx) = mpsc::channel();
298        let config = PipeConfig {
299            name: "test-pipe".into(),
300            command: "cat".into(),
301            respawn_delay: Duration::from_secs(1),
302            interface_id: InterfaceId(100),
303        };
304
305        let mut writer = start(config, tx).unwrap();
306
307        // Drain InterfaceUp
308        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
309        assert!(matches!(
310            event,
311            Event::InterfaceUp(InterfaceId(100), None, None)
312        ));
313
314        // Send a packet (>= 19 bytes for HDLC minimum)
315        let payload: Vec<u8> = (0..32).collect();
316        writer.send_frame(&payload).unwrap();
317
318        // Should receive Frame event (cat echos back the HDLC frame)
319        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
320        match event {
321            Event::Frame { interface_id, data } => {
322                assert_eq!(interface_id, InterfaceId(100));
323                assert_eq!(data, payload);
324            }
325            other => panic!("expected Frame, got {:?}", other),
326        }
327    }
328
329    #[test]
330    fn pipe_writer_sends() {
331        // Verify the writer wraps data in HDLC
332        let (tx, rx) = mpsc::channel();
333        let config = PipeConfig {
334            name: "test-pipe-writer".into(),
335            command: "cat".into(),
336            respawn_delay: Duration::from_secs(1),
337            interface_id: InterfaceId(101),
338        };
339
340        let mut writer = start(config, tx).unwrap();
341        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
342
343        // Write data and verify we get it back as HDLC frame
344        let payload: Vec<u8> = (10..42).collect();
345        writer.send_frame(&payload).unwrap();
346
347        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
348        match event {
349            Event::Frame { data, .. } => {
350                assert_eq!(data, payload);
351            }
352            other => panic!("expected Frame, got {:?}", other),
353        }
354    }
355
356    #[test]
357    fn pipe_subprocess_exit() {
358        // Use a command that exits immediately
359        let (tx, rx) = mpsc::channel();
360        let config = PipeConfig {
361            name: "test-pipe-exit".into(),
362            command: "true".into(),                 // exits immediately with 0
363            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
364            interface_id: InterfaceId(102),
365        };
366
367        let _writer = start(config, tx).unwrap();
368
369        // Should get InterfaceUp then InterfaceDown
370        let mut got_down = false;
371        for _ in 0..5 {
372            match rx.recv_timeout(Duration::from_secs(2)) {
373                Ok(Event::InterfaceDown(InterfaceId(102))) => {
374                    got_down = true;
375                    break;
376                }
377                Ok(_) => continue,
378                Err(_) => break,
379            }
380        }
381        assert!(
382            got_down,
383            "should receive InterfaceDown after subprocess exits"
384        );
385    }
386
387    #[test]
388    fn pipe_config_defaults() {
389        let config = PipeConfig::default();
390        assert_eq!(config.respawn_delay, Duration::from_secs(5));
391        assert_eq!(config.interface_id, InterfaceId(0));
392        assert!(config.command.is_empty());
393    }
394
395    #[test]
396    fn pipe_invalid_command() {
397        let (tx, _rx) = mpsc::channel();
398        let config = PipeConfig {
399            name: "test-pipe-bad".into(),
400            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
401            respawn_delay: Duration::from_secs(60),
402            interface_id: InterfaceId(103),
403        };
404
405        // sh -c <nonexistent> will start sh successfully but the child exits immediately
406        // For a truly invalid binary we need to check that the process fails
407        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
408        // Let's test with a different approach: verify it doesn't panic
409        let result = start(config, tx);
410        // sh will spawn successfully even if the inner command fails
411        // The real test is that it doesn't panic
412        assert!(result.is_ok());
413    }
414}