Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
17use crate::event::{Event, EventSender};
18use crate::hdlc;
19use crate::interface::{lock_or_recover, Writer};
20
21/// Configuration for a pipe interface.
22#[derive(Debug, Clone)]
23pub struct PipeConfig {
24    pub name: String,
25    pub command: String,
26    pub respawn_delay: Duration,
27    pub interface_id: InterfaceId,
28    pub runtime: Arc<Mutex<PipeRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct PipeRuntime {
33    pub respawn_delay: Duration,
34}
35
36impl PipeRuntime {
37    pub fn from_config(config: &PipeConfig) -> Self {
38        Self {
39            respawn_delay: config.respawn_delay,
40        }
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct PipeRuntimeConfigHandle {
46    pub interface_name: String,
47    pub runtime: Arc<Mutex<PipeRuntime>>,
48    pub startup: PipeRuntime,
49}
50
51impl Default for PipeConfig {
52    fn default() -> Self {
53        let mut config = PipeConfig {
54            name: String::new(),
55            command: String::new(),
56            respawn_delay: Duration::from_secs(5),
57            interface_id: InterfaceId(0),
58            runtime: Arc::new(Mutex::new(PipeRuntime {
59                respawn_delay: Duration::from_secs(5),
60            })),
61        };
62        let startup = PipeRuntime::from_config(&config);
63        config.runtime = Arc::new(Mutex::new(startup));
64        config
65    }
66}
67
68/// Writer that sends HDLC-framed data to a subprocess stdin.
69struct PipeWriter {
70    stdin: std::process::ChildStdin,
71}
72
73impl Writer for PipeWriter {
74    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
75        self.stdin.write_all(&hdlc::frame(data))
76    }
77}
78
79/// Start the pipe interface. Spawns subprocess, returns writer.
80pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
81    let id = config.interface_id;
82    {
83        let startup = PipeRuntime::from_config(&config);
84        *lock_or_recover(&config.runtime, "pipe runtime") = startup;
85    }
86
87    let mut child = spawn_child(&config.command)?;
88
89    let stdout = child
90        .stdout
91        .take()
92        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
93    let stdin = child
94        .stdin
95        .take()
96        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
97
98    log::info!(
99        "[{}] pipe interface started: {}",
100        config.name,
101        config.command
102    );
103
104    // Signal interface up
105    let _ = tx.send(Event::InterfaceUp(id, None, None));
106
107    // Spawn reader thread
108    thread::Builder::new()
109        .name(format!("pipe-reader-{}", id.0))
110        .spawn(move || {
111            reader_loop(stdout, child, id, config, tx);
112        })?;
113
114    Ok(Box::new(PipeWriter { stdin }))
115}
116
117fn spawn_child(command: &str) -> io::Result<std::process::Child> {
118    Command::new("sh")
119        .args(["-c", command])
120        .stdin(Stdio::piped())
121        .stdout(Stdio::piped())
122        .stderr(Stdio::null())
123        .spawn()
124}
125
126/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
127/// On subprocess exit, attempts respawn.
128fn reader_loop(
129    mut stdout: std::process::ChildStdout,
130    mut child: std::process::Child,
131    id: InterfaceId,
132    config: PipeConfig,
133    tx: EventSender,
134) {
135    let mut decoder = hdlc::Decoder::new();
136    let mut buf = [0u8; 4096];
137
138    loop {
139        match stdout.read(&mut buf) {
140            Ok(0) => {
141                // EOF — subprocess exited
142                let _ = child.wait();
143                log::warn!("[{}] subprocess terminated", config.name);
144                let _ = tx.send(Event::InterfaceDown(id));
145                match respawn(&config, &tx) {
146                    Some((new_stdout, new_child)) => {
147                        stdout = new_stdout;
148                        child = new_child;
149                        decoder = hdlc::Decoder::new();
150                        continue;
151                    }
152                    None => return,
153                }
154            }
155            Ok(n) => {
156                for frame in decoder.feed(&buf[..n]) {
157                    if tx
158                        .send(Event::Frame {
159                            interface_id: id,
160                            data: frame,
161                        })
162                        .is_err()
163                    {
164                        // Driver shut down
165                        let _ = child.kill();
166                        return;
167                    }
168                }
169            }
170            Err(e) => {
171                log::warn!("[{}] pipe read error: {}", config.name, e);
172                let _ = child.kill();
173                let _ = child.wait();
174                let _ = tx.send(Event::InterfaceDown(id));
175                match respawn(&config, &tx) {
176                    Some((new_stdout, new_child)) => {
177                        stdout = new_stdout;
178                        child = new_child;
179                        decoder = hdlc::Decoder::new();
180                        continue;
181                    }
182                    None => return,
183                }
184            }
185        }
186    }
187}
188
189/// Attempt to respawn the subprocess after a delay.
190fn respawn(
191    config: &PipeConfig,
192    tx: &EventSender,
193) -> Option<(std::process::ChildStdout, std::process::Child)> {
194    loop {
195        let respawn_delay = lock_or_recover(&config.runtime, "pipe runtime").respawn_delay;
196        thread::sleep(respawn_delay);
197        log::info!(
198            "[{}] attempting to respawn subprocess: {}",
199            config.name,
200            config.command
201        );
202
203        match spawn_child(&config.command) {
204            Ok(mut child) => {
205                let stdout = match child.stdout.take() {
206                    Some(s) => s,
207                    None => {
208                        let _ = child.kill();
209                        let _ = child.wait();
210                        continue;
211                    }
212                };
213                let stdin = match child.stdin.take() {
214                    Some(s) => s,
215                    None => {
216                        let _ = child.kill();
217                        let _ = child.wait();
218                        continue;
219                    }
220                };
221
222                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
223                if tx
224                    .send(Event::InterfaceUp(
225                        config.interface_id,
226                        Some(new_writer),
227                        None,
228                    ))
229                    .is_err()
230                {
231                    return None; // Driver shut down
232                }
233                log::info!("[{}] subprocess respawned", config.name);
234                return Some((stdout, child));
235            }
236            Err(e) => {
237                log::warn!("[{}] respawn failed: {}", config.name, e);
238            }
239        }
240    }
241}
242
243/// Factory for [`PipeInterface`] instances.
244pub struct PipeFactory;
245
246impl InterfaceFactory for PipeFactory {
247    fn type_name(&self) -> &str {
248        "PipeInterface"
249    }
250
251    fn parse_config(
252        &self,
253        name: &str,
254        id: InterfaceId,
255        params: &HashMap<String, String>,
256    ) -> Result<Box<dyn InterfaceConfigData>, String> {
257        let command = params
258            .get("command")
259            .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
260            .clone();
261
262        let respawn_delay = match params.get("respawn_delay") {
263            Some(v) => {
264                let ms: u64 = v
265                    .parse()
266                    .map_err(|_| format!("invalid respawn_delay: {}", v))?;
267                Duration::from_millis(ms)
268            }
269            None => Duration::from_secs(5),
270        };
271
272        Ok(Box::new(PipeConfig {
273            name: name.to_string(),
274            command,
275            respawn_delay,
276            interface_id: id,
277            runtime: Arc::new(Mutex::new(PipeRuntime { respawn_delay })),
278        }))
279    }
280
281    fn start(
282        &self,
283        config: Box<dyn InterfaceConfigData>,
284        ctx: StartContext,
285    ) -> io::Result<StartResult> {
286        let pipe_config = *config
287            .into_any()
288            .downcast::<PipeConfig>()
289            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
290
291        let id = pipe_config.interface_id;
292        let info = InterfaceInfo {
293            id,
294            name: pipe_config.name.clone(),
295            mode: ctx.mode,
296            out_capable: true,
297            in_capable: true,
298            bitrate: Some(1_000_000),
299            airtime_profile: None,
300            announce_rate_target: None,
301            announce_rate_grace: 0,
302            announce_rate_penalty: 0.0,
303            announce_cap: rns_core::constants::ANNOUNCE_CAP,
304            is_local_client: false,
305            wants_tunnel: false,
306            tunnel_id: None,
307            mtu: rns_core::constants::MTU as u32,
308            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
309            ia_freq: 0.0,
310            started: crate::time::now(),
311        };
312
313        let writer = start(pipe_config, ctx.tx)?;
314
315        Ok(StartResult::Simple {
316            id,
317            info,
318            writer,
319            interface_type_name: "PipeInterface".to_string(),
320        })
321    }
322}
323
324pub(crate) fn pipe_runtime_handle_from_config(config: &PipeConfig) -> PipeRuntimeConfigHandle {
325    PipeRuntimeConfigHandle {
326        interface_name: config.name.clone(),
327        runtime: Arc::clone(&config.runtime),
328        startup: PipeRuntime::from_config(config),
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn pipe_start_and_receive() {
338        // Use `cat` as a loopback subprocess
339        let (tx, rx) = crate::event::channel();
340        let config = PipeConfig {
341            name: "test-pipe".into(),
342            command: "cat".into(),
343            respawn_delay: Duration::from_secs(1),
344            interface_id: InterfaceId(100),
345            runtime: Arc::new(Mutex::new(PipeRuntime {
346                respawn_delay: Duration::from_secs(1),
347            })),
348        };
349
350        let mut writer = start(config, tx).unwrap();
351
352        // Drain InterfaceUp
353        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
354        assert!(matches!(
355            event,
356            Event::InterfaceUp(InterfaceId(100), None, None)
357        ));
358
359        // Send a packet (>= 19 bytes for HDLC minimum)
360        let payload: Vec<u8> = (0..32).collect();
361        writer.send_frame(&payload).unwrap();
362
363        // Should receive Frame event (cat echos back the HDLC frame)
364        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
365        match event {
366            Event::Frame { interface_id, data } => {
367                assert_eq!(interface_id, InterfaceId(100));
368                assert_eq!(data, payload);
369            }
370            other => panic!("expected Frame, got {:?}", other),
371        }
372    }
373
374    #[test]
375    fn pipe_writer_sends() {
376        // Verify the writer wraps data in HDLC
377        let (tx, rx) = crate::event::channel();
378        let config = PipeConfig {
379            name: "test-pipe-writer".into(),
380            command: "cat".into(),
381            respawn_delay: Duration::from_secs(1),
382            interface_id: InterfaceId(101),
383            runtime: Arc::new(Mutex::new(PipeRuntime {
384                respawn_delay: Duration::from_secs(1),
385            })),
386        };
387
388        let mut writer = start(config, tx).unwrap();
389        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
390
391        // Write data and verify we get it back as HDLC frame
392        let payload: Vec<u8> = (10..42).collect();
393        writer.send_frame(&payload).unwrap();
394
395        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
396        match event {
397            Event::Frame { data, .. } => {
398                assert_eq!(data, payload);
399            }
400            other => panic!("expected Frame, got {:?}", other),
401        }
402    }
403
404    #[test]
405    fn pipe_subprocess_exit() {
406        // Use a command that exits immediately
407        let (tx, rx) = crate::event::channel();
408        let config = PipeConfig {
409            name: "test-pipe-exit".into(),
410            command: "true".into(),                 // exits immediately with 0
411            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
412            interface_id: InterfaceId(102),
413            runtime: Arc::new(Mutex::new(PipeRuntime {
414                respawn_delay: Duration::from_secs(60),
415            })),
416        };
417
418        let _writer = start(config, tx).unwrap();
419
420        // Should get InterfaceUp then InterfaceDown
421        let mut got_down = false;
422        for _ in 0..5 {
423            match rx.recv_timeout(Duration::from_secs(2)) {
424                Ok(Event::InterfaceDown(InterfaceId(102))) => {
425                    got_down = true;
426                    break;
427                }
428                Ok(_) => continue,
429                Err(_) => break,
430            }
431        }
432        assert!(
433            got_down,
434            "should receive InterfaceDown after subprocess exits"
435        );
436    }
437
438    #[test]
439    fn pipe_config_defaults() {
440        let config = PipeConfig::default();
441        assert_eq!(config.respawn_delay, Duration::from_secs(5));
442        assert_eq!(config.interface_id, InterfaceId(0));
443        assert!(config.command.is_empty());
444    }
445
446    #[test]
447    fn pipe_invalid_command() {
448        let (tx, _rx) = crate::event::channel();
449        let config = PipeConfig {
450            name: "test-pipe-bad".into(),
451            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
452            respawn_delay: Duration::from_secs(60),
453            interface_id: InterfaceId(103),
454            runtime: Arc::new(Mutex::new(PipeRuntime {
455                respawn_delay: Duration::from_secs(60),
456            })),
457        };
458
459        // sh -c <nonexistent> will start sh successfully but the child exits immediately
460        // For a truly invalid binary we need to check that the process fails
461        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
462        // Let's test with a different approach: verify it doesn't panic
463        let result = start(config, tx);
464        // sh will spawn successfully even if the inner command fails
465        // The real test is that it doesn't panic
466        assert!(result.is_ok());
467    }
468}