Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
17use crate::event::{Event, EventSender};
18use crate::hdlc;
19use crate::interface::Writer;
20
21/// Configuration for a pipe interface.
22#[derive(Debug, Clone)]
23pub struct PipeConfig {
24    pub name: String,
25    pub command: String,
26    pub respawn_delay: Duration,
27    pub interface_id: InterfaceId,
28    pub runtime: Arc<Mutex<PipeRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct PipeRuntime {
33    pub respawn_delay: Duration,
34}
35
36impl PipeRuntime {
37    pub fn from_config(config: &PipeConfig) -> Self {
38        Self {
39            respawn_delay: config.respawn_delay,
40        }
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct PipeRuntimeConfigHandle {
46    pub interface_name: String,
47    pub runtime: Arc<Mutex<PipeRuntime>>,
48    pub startup: PipeRuntime,
49}
50
51impl Default for PipeConfig {
52    fn default() -> Self {
53        let mut config = PipeConfig {
54            name: String::new(),
55            command: String::new(),
56            respawn_delay: Duration::from_secs(5),
57            interface_id: InterfaceId(0),
58            runtime: Arc::new(Mutex::new(PipeRuntime {
59                respawn_delay: Duration::from_secs(5),
60            })),
61        };
62        let startup = PipeRuntime::from_config(&config);
63        config.runtime = Arc::new(Mutex::new(startup));
64        config
65    }
66}
67
68/// Writer that sends HDLC-framed data to a subprocess stdin.
69struct PipeWriter {
70    stdin: std::process::ChildStdin,
71}
72
73impl Writer for PipeWriter {
74    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
75        self.stdin.write_all(&hdlc::frame(data))
76    }
77}
78
79/// Start the pipe interface. Spawns subprocess, returns writer.
80pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
81    let id = config.interface_id;
82    {
83        let startup = PipeRuntime::from_config(&config);
84        *config.runtime.lock().unwrap() = startup;
85    }
86
87    let mut child = spawn_child(&config.command)?;
88
89    let stdout = child
90        .stdout
91        .take()
92        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
93    let stdin = child
94        .stdin
95        .take()
96        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
97
98    log::info!(
99        "[{}] pipe interface started: {}",
100        config.name,
101        config.command
102    );
103
104    // Signal interface up
105    let _ = tx.send(Event::InterfaceUp(id, None, None));
106
107    // Spawn reader thread
108    thread::Builder::new()
109        .name(format!("pipe-reader-{}", id.0))
110        .spawn(move || {
111            reader_loop(stdout, child, id, config, tx);
112        })?;
113
114    Ok(Box::new(PipeWriter { stdin }))
115}
116
117fn spawn_child(command: &str) -> io::Result<std::process::Child> {
118    Command::new("sh")
119        .args(["-c", command])
120        .stdin(Stdio::piped())
121        .stdout(Stdio::piped())
122        .stderr(Stdio::null())
123        .spawn()
124}
125
126/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
127/// On subprocess exit, attempts respawn.
128fn reader_loop(
129    mut stdout: std::process::ChildStdout,
130    mut child: std::process::Child,
131    id: InterfaceId,
132    config: PipeConfig,
133    tx: EventSender,
134) {
135    let mut decoder = hdlc::Decoder::new();
136    let mut buf = [0u8; 4096];
137
138    loop {
139        match stdout.read(&mut buf) {
140            Ok(0) => {
141                // EOF — subprocess exited
142                let _ = child.wait();
143                log::warn!("[{}] subprocess terminated", config.name);
144                let _ = tx.send(Event::InterfaceDown(id));
145                match respawn(&config, &tx) {
146                    Some((new_stdout, new_child)) => {
147                        stdout = new_stdout;
148                        child = new_child;
149                        decoder = hdlc::Decoder::new();
150                        continue;
151                    }
152                    None => return,
153                }
154            }
155            Ok(n) => {
156                for frame in decoder.feed(&buf[..n]) {
157                    if tx
158                        .send(Event::Frame {
159                            interface_id: id,
160                            data: frame,
161                        })
162                        .is_err()
163                    {
164                        // Driver shut down
165                        let _ = child.kill();
166                        return;
167                    }
168                }
169            }
170            Err(e) => {
171                log::warn!("[{}] pipe read error: {}", config.name, e);
172                let _ = child.kill();
173                let _ = child.wait();
174                let _ = tx.send(Event::InterfaceDown(id));
175                match respawn(&config, &tx) {
176                    Some((new_stdout, new_child)) => {
177                        stdout = new_stdout;
178                        child = new_child;
179                        decoder = hdlc::Decoder::new();
180                        continue;
181                    }
182                    None => return,
183                }
184            }
185        }
186    }
187}
188
189/// Attempt to respawn the subprocess after a delay.
190fn respawn(
191    config: &PipeConfig,
192    tx: &EventSender,
193) -> Option<(std::process::ChildStdout, std::process::Child)> {
194    loop {
195        thread::sleep(config.runtime.lock().unwrap().respawn_delay);
196        log::info!(
197            "[{}] attempting to respawn subprocess: {}",
198            config.name,
199            config.command
200        );
201
202        match spawn_child(&config.command) {
203            Ok(mut child) => {
204                let stdout = match child.stdout.take() {
205                    Some(s) => s,
206                    None => {
207                        let _ = child.kill();
208                        let _ = child.wait();
209                        continue;
210                    }
211                };
212                let stdin = match child.stdin.take() {
213                    Some(s) => s,
214                    None => {
215                        let _ = child.kill();
216                        let _ = child.wait();
217                        continue;
218                    }
219                };
220
221                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
222                if tx
223                    .send(Event::InterfaceUp(
224                        config.interface_id,
225                        Some(new_writer),
226                        None,
227                    ))
228                    .is_err()
229                {
230                    return None; // Driver shut down
231                }
232                log::info!("[{}] subprocess respawned", config.name);
233                return Some((stdout, child));
234            }
235            Err(e) => {
236                log::warn!("[{}] respawn failed: {}", config.name, e);
237            }
238        }
239    }
240}
241
242/// Factory for [`PipeInterface`] instances.
243pub struct PipeFactory;
244
245impl InterfaceFactory for PipeFactory {
246    fn type_name(&self) -> &str {
247        "PipeInterface"
248    }
249
250    fn parse_config(
251        &self,
252        name: &str,
253        id: InterfaceId,
254        params: &HashMap<String, String>,
255    ) -> Result<Box<dyn InterfaceConfigData>, String> {
256        let command = params
257            .get("command")
258            .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
259            .clone();
260
261        let respawn_delay = match params.get("respawn_delay") {
262            Some(v) => {
263                let ms: u64 = v
264                    .parse()
265                    .map_err(|_| format!("invalid respawn_delay: {}", v))?;
266                Duration::from_millis(ms)
267            }
268            None => Duration::from_secs(5),
269        };
270
271        Ok(Box::new(PipeConfig {
272            name: name.to_string(),
273            command,
274            respawn_delay,
275            interface_id: id,
276            runtime: Arc::new(Mutex::new(PipeRuntime { respawn_delay })),
277        }))
278    }
279
280    fn start(
281        &self,
282        config: Box<dyn InterfaceConfigData>,
283        ctx: StartContext,
284    ) -> io::Result<StartResult> {
285        let pipe_config = *config
286            .into_any()
287            .downcast::<PipeConfig>()
288            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
289
290        let id = pipe_config.interface_id;
291        let info = InterfaceInfo {
292            id,
293            name: pipe_config.name.clone(),
294            mode: ctx.mode,
295            out_capable: true,
296            in_capable: true,
297            bitrate: Some(1_000_000),
298            announce_rate_target: None,
299            announce_rate_grace: 0,
300            announce_rate_penalty: 0.0,
301            announce_cap: rns_core::constants::ANNOUNCE_CAP,
302            is_local_client: false,
303            wants_tunnel: false,
304            tunnel_id: None,
305            mtu: rns_core::constants::MTU as u32,
306            ingress_control: false,
307            ia_freq: 0.0,
308            started: crate::time::now(),
309        };
310
311        let writer = start(pipe_config, ctx.tx)?;
312
313        Ok(StartResult::Simple {
314            id,
315            info,
316            writer,
317            interface_type_name: "PipeInterface".to_string(),
318        })
319    }
320}
321
322pub(crate) fn pipe_runtime_handle_from_config(config: &PipeConfig) -> PipeRuntimeConfigHandle {
323    PipeRuntimeConfigHandle {
324        interface_name: config.name.clone(),
325        runtime: Arc::clone(&config.runtime),
326        startup: PipeRuntime::from_config(config),
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn pipe_start_and_receive() {
336        // Use `cat` as a loopback subprocess
337        let (tx, rx) = crate::event::channel();
338        let config = PipeConfig {
339            name: "test-pipe".into(),
340            command: "cat".into(),
341            respawn_delay: Duration::from_secs(1),
342            interface_id: InterfaceId(100),
343            runtime: Arc::new(Mutex::new(PipeRuntime {
344                respawn_delay: Duration::from_secs(1),
345            })),
346        };
347
348        let mut writer = start(config, tx).unwrap();
349
350        // Drain InterfaceUp
351        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
352        assert!(matches!(
353            event,
354            Event::InterfaceUp(InterfaceId(100), None, None)
355        ));
356
357        // Send a packet (>= 19 bytes for HDLC minimum)
358        let payload: Vec<u8> = (0..32).collect();
359        writer.send_frame(&payload).unwrap();
360
361        // Should receive Frame event (cat echos back the HDLC frame)
362        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
363        match event {
364            Event::Frame { interface_id, data } => {
365                assert_eq!(interface_id, InterfaceId(100));
366                assert_eq!(data, payload);
367            }
368            other => panic!("expected Frame, got {:?}", other),
369        }
370    }
371
372    #[test]
373    fn pipe_writer_sends() {
374        // Verify the writer wraps data in HDLC
375        let (tx, rx) = crate::event::channel();
376        let config = PipeConfig {
377            name: "test-pipe-writer".into(),
378            command: "cat".into(),
379            respawn_delay: Duration::from_secs(1),
380            interface_id: InterfaceId(101),
381            runtime: Arc::new(Mutex::new(PipeRuntime {
382                respawn_delay: Duration::from_secs(1),
383            })),
384        };
385
386        let mut writer = start(config, tx).unwrap();
387        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
388
389        // Write data and verify we get it back as HDLC frame
390        let payload: Vec<u8> = (10..42).collect();
391        writer.send_frame(&payload).unwrap();
392
393        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
394        match event {
395            Event::Frame { data, .. } => {
396                assert_eq!(data, payload);
397            }
398            other => panic!("expected Frame, got {:?}", other),
399        }
400    }
401
402    #[test]
403    fn pipe_subprocess_exit() {
404        // Use a command that exits immediately
405        let (tx, rx) = crate::event::channel();
406        let config = PipeConfig {
407            name: "test-pipe-exit".into(),
408            command: "true".into(),                 // exits immediately with 0
409            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
410            interface_id: InterfaceId(102),
411            runtime: Arc::new(Mutex::new(PipeRuntime {
412                respawn_delay: Duration::from_secs(60),
413            })),
414        };
415
416        let _writer = start(config, tx).unwrap();
417
418        // Should get InterfaceUp then InterfaceDown
419        let mut got_down = false;
420        for _ in 0..5 {
421            match rx.recv_timeout(Duration::from_secs(2)) {
422                Ok(Event::InterfaceDown(InterfaceId(102))) => {
423                    got_down = true;
424                    break;
425                }
426                Ok(_) => continue,
427                Err(_) => break,
428            }
429        }
430        assert!(
431            got_down,
432            "should receive InterfaceDown after subprocess exits"
433        );
434    }
435
436    #[test]
437    fn pipe_config_defaults() {
438        let config = PipeConfig::default();
439        assert_eq!(config.respawn_delay, Duration::from_secs(5));
440        assert_eq!(config.interface_id, InterfaceId(0));
441        assert!(config.command.is_empty());
442    }
443
444    #[test]
445    fn pipe_invalid_command() {
446        let (tx, _rx) = crate::event::channel();
447        let config = PipeConfig {
448            name: "test-pipe-bad".into(),
449            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
450            respawn_delay: Duration::from_secs(60),
451            interface_id: InterfaceId(103),
452            runtime: Arc::new(Mutex::new(PipeRuntime {
453                respawn_delay: Duration::from_secs(60),
454            })),
455        };
456
457        // sh -c <nonexistent> will start sh successfully but the child exits immediately
458        // For a truly invalid binary we need to check that the process fails
459        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
460        // Let's test with a different approach: verify it doesn't panic
461        let result = start(config, tx);
462        // sh will spawn successfully even if the inner command fails
463        // The real test is that it doesn't panic
464        assert!(result.is_ok());
465    }
466}