Skip to main content

rns_net/interface/
pipe.rs

1//! Pipe interface: subprocess stdin/stdout with HDLC framing.
2//!
3//! Matches Python `PipeInterface.py`. Spawns a subprocess, communicates
4//! via piped stdin/stdout using HDLC framing for packet boundaries.
5//! Auto-respawns subprocess on failure.
6
7use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
17use crate::event::{Event, EventSender};
18use crate::hdlc;
19use crate::interface::{lock_or_recover, Writer};
20
21/// Configuration for a pipe interface.
22#[derive(Debug, Clone)]
23pub struct PipeConfig {
24    pub name: String,
25    pub command: String,
26    pub respawn_delay: Duration,
27    pub interface_id: InterfaceId,
28    pub runtime: Arc<Mutex<PipeRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct PipeRuntime {
33    pub respawn_delay: Duration,
34}
35
36impl PipeRuntime {
37    pub fn from_config(config: &PipeConfig) -> Self {
38        Self {
39            respawn_delay: config.respawn_delay,
40        }
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct PipeRuntimeConfigHandle {
46    pub interface_name: String,
47    pub runtime: Arc<Mutex<PipeRuntime>>,
48    pub startup: PipeRuntime,
49}
50
51impl Default for PipeConfig {
52    fn default() -> Self {
53        let mut config = PipeConfig {
54            name: String::new(),
55            command: String::new(),
56            respawn_delay: Duration::from_secs(5),
57            interface_id: InterfaceId(0),
58            runtime: Arc::new(Mutex::new(PipeRuntime {
59                respawn_delay: Duration::from_secs(5),
60            })),
61        };
62        let startup = PipeRuntime::from_config(&config);
63        config.runtime = Arc::new(Mutex::new(startup));
64        config
65    }
66}
67
68/// Writer that sends HDLC-framed data to a subprocess stdin.
69struct PipeWriter {
70    stdin: std::process::ChildStdin,
71}
72
73impl Writer for PipeWriter {
74    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
75        self.stdin.write_all(&hdlc::frame(data))
76    }
77}
78
79/// Start the pipe interface. Spawns subprocess, returns writer.
80pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
81    let id = config.interface_id;
82    {
83        let startup = PipeRuntime::from_config(&config);
84        *lock_or_recover(&config.runtime, "pipe runtime") = startup;
85    }
86
87    let mut child = spawn_child(&config.command)?;
88
89    let stdout = child
90        .stdout
91        .take()
92        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
93    let stdin = child
94        .stdin
95        .take()
96        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
97
98    log::info!(
99        "[{}] pipe interface started: {}",
100        config.name,
101        config.command
102    );
103
104    // Signal interface up
105    let _ = tx.send(Event::InterfaceUp(id, None, None));
106
107    // Spawn reader thread
108    thread::Builder::new()
109        .name(format!("pipe-reader-{}", id.0))
110        .spawn(move || {
111            reader_loop(stdout, child, id, config, tx);
112        })?;
113
114    Ok(Box::new(PipeWriter { stdin }))
115}
116
117fn spawn_child(command: &str) -> io::Result<std::process::Child> {
118    Command::new("sh")
119        .args(["-c", command])
120        .stdin(Stdio::piped())
121        .stdout(Stdio::piped())
122        .stderr(Stdio::null())
123        .spawn()
124}
125
126/// Reader loop: reads from subprocess stdout, HDLC-decodes, dispatches events.
127/// On subprocess exit, attempts respawn.
128fn reader_loop(
129    mut stdout: std::process::ChildStdout,
130    mut child: std::process::Child,
131    id: InterfaceId,
132    config: PipeConfig,
133    tx: EventSender,
134) {
135    let mut decoder = hdlc::Decoder::new();
136    let mut buf = [0u8; 4096];
137
138    loop {
139        match stdout.read(&mut buf) {
140            Ok(0) => {
141                // EOF — subprocess exited
142                let _ = child.wait();
143                log::warn!("[{}] subprocess terminated", config.name);
144                let _ = tx.send(Event::InterfaceDown(id));
145                match respawn(&config, &tx) {
146                    Some((new_stdout, new_child)) => {
147                        stdout = new_stdout;
148                        child = new_child;
149                        decoder = hdlc::Decoder::new();
150                        continue;
151                    }
152                    None => return,
153                }
154            }
155            Ok(n) => {
156                for frame in decoder.feed(&buf[..n]) {
157                    if tx
158                        .send(Event::Frame {
159                            interface_id: id,
160                            data: frame,
161                            rssi: None,
162                            snr: None,
163                        })
164                        .is_err()
165                    {
166                        // Driver shut down
167                        let _ = child.kill();
168                        return;
169                    }
170                }
171            }
172            Err(e) => {
173                log::warn!("[{}] pipe read error: {}", config.name, e);
174                let _ = child.kill();
175                let _ = child.wait();
176                let _ = tx.send(Event::InterfaceDown(id));
177                match respawn(&config, &tx) {
178                    Some((new_stdout, new_child)) => {
179                        stdout = new_stdout;
180                        child = new_child;
181                        decoder = hdlc::Decoder::new();
182                        continue;
183                    }
184                    None => return,
185                }
186            }
187        }
188    }
189}
190
191/// Attempt to respawn the subprocess after a delay.
192fn respawn(
193    config: &PipeConfig,
194    tx: &EventSender,
195) -> Option<(std::process::ChildStdout, std::process::Child)> {
196    loop {
197        let respawn_delay = lock_or_recover(&config.runtime, "pipe runtime").respawn_delay;
198        thread::sleep(respawn_delay);
199        log::info!(
200            "[{}] attempting to respawn subprocess: {}",
201            config.name,
202            config.command
203        );
204
205        match spawn_child(&config.command) {
206            Ok(mut child) => {
207                let stdout = match child.stdout.take() {
208                    Some(s) => s,
209                    None => {
210                        let _ = child.kill();
211                        let _ = child.wait();
212                        continue;
213                    }
214                };
215                let stdin = match child.stdin.take() {
216                    Some(s) => s,
217                    None => {
218                        let _ = child.kill();
219                        let _ = child.wait();
220                        continue;
221                    }
222                };
223
224                let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
225                if tx
226                    .send(Event::InterfaceUp(
227                        config.interface_id,
228                        Some(new_writer),
229                        None,
230                    ))
231                    .is_err()
232                {
233                    return None; // Driver shut down
234                }
235                log::info!("[{}] subprocess respawned", config.name);
236                return Some((stdout, child));
237            }
238            Err(e) => {
239                log::warn!("[{}] respawn failed: {}", config.name, e);
240            }
241        }
242    }
243}
244
245/// Factory for [`PipeInterface`] instances.
246pub struct PipeFactory;
247
248impl InterfaceFactory for PipeFactory {
249    fn type_name(&self) -> &str {
250        "PipeInterface"
251    }
252
253    fn parse_config(
254        &self,
255        name: &str,
256        id: InterfaceId,
257        params: &HashMap<String, String>,
258    ) -> Result<Box<dyn InterfaceConfigData>, String> {
259        let command = params
260            .get("command")
261            .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
262            .clone();
263
264        let respawn_delay = match params.get("respawn_delay") {
265            Some(v) => {
266                let ms: u64 = v
267                    .parse()
268                    .map_err(|_| format!("invalid respawn_delay: {}", v))?;
269                Duration::from_millis(ms)
270            }
271            None => Duration::from_secs(5),
272        };
273
274        Ok(Box::new(PipeConfig {
275            name: name.to_string(),
276            command,
277            respawn_delay,
278            interface_id: id,
279            runtime: Arc::new(Mutex::new(PipeRuntime { respawn_delay })),
280        }))
281    }
282
283    fn start(
284        &self,
285        config: Box<dyn InterfaceConfigData>,
286        ctx: StartContext,
287    ) -> io::Result<StartResult> {
288        let pipe_config = *config
289            .into_any()
290            .downcast::<PipeConfig>()
291            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
292
293        let id = pipe_config.interface_id;
294        let info = InterfaceInfo {
295            id,
296            name: pipe_config.name.clone(),
297            mode: ctx.mode,
298            out_capable: true,
299            in_capable: true,
300            bitrate: Some(1_000_000),
301            airtime_profile: None,
302            announce_rate_target: None,
303            announce_rate_grace: 0,
304            announce_rate_penalty: 0.0,
305            announce_cap: rns_core::constants::ANNOUNCE_CAP,
306            is_local_client: false,
307            wants_tunnel: false,
308            tunnel_id: None,
309            mtu: rns_core::constants::MTU as u32,
310            ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
311            ia_freq: 0.0,
312            ip_freq: 0.0,
313            op_freq: 0.0,
314            op_samples: 0,
315            started: crate::time::now(),
316        };
317
318        let writer = start(pipe_config, ctx.tx)?;
319
320        Ok(StartResult::Simple {
321            id,
322            info,
323            writer,
324            interface_type_name: "PipeInterface".to_string(),
325        })
326    }
327}
328
329pub(crate) fn pipe_runtime_handle_from_config(config: &PipeConfig) -> PipeRuntimeConfigHandle {
330    PipeRuntimeConfigHandle {
331        interface_name: config.name.clone(),
332        runtime: Arc::clone(&config.runtime),
333        startup: PipeRuntime::from_config(config),
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn pipe_start_and_receive() {
343        // Use `cat` as a loopback subprocess
344        let (tx, rx) = crate::event::channel();
345        let config = PipeConfig {
346            name: "test-pipe".into(),
347            command: "cat".into(),
348            respawn_delay: Duration::from_secs(1),
349            interface_id: InterfaceId(100),
350            runtime: Arc::new(Mutex::new(PipeRuntime {
351                respawn_delay: Duration::from_secs(1),
352            })),
353        };
354
355        let mut writer = start(config, tx).unwrap();
356
357        // Drain InterfaceUp
358        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
359        assert!(matches!(
360            event,
361            Event::InterfaceUp(InterfaceId(100), None, None)
362        ));
363
364        // Send a packet (>= 19 bytes for HDLC minimum)
365        let payload: Vec<u8> = (0..32).collect();
366        writer.send_frame(&payload).unwrap();
367
368        // Should receive Frame event (cat echos back the HDLC frame)
369        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
370        match event {
371            Event::Frame {
372                interface_id,
373                data,
374                rssi: _,
375                snr: _,
376            } => {
377                assert_eq!(interface_id, InterfaceId(100));
378                assert_eq!(data, payload);
379            }
380            other => panic!("expected Frame, got {:?}", other),
381        }
382    }
383
384    #[test]
385    fn pipe_writer_sends() {
386        // Verify the writer wraps data in HDLC
387        let (tx, rx) = crate::event::channel();
388        let config = PipeConfig {
389            name: "test-pipe-writer".into(),
390            command: "cat".into(),
391            respawn_delay: Duration::from_secs(1),
392            interface_id: InterfaceId(101),
393            runtime: Arc::new(Mutex::new(PipeRuntime {
394                respawn_delay: Duration::from_secs(1),
395            })),
396        };
397
398        let mut writer = start(config, tx).unwrap();
399        let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); // drain InterfaceUp
400
401        // Write data and verify we get it back as HDLC frame
402        let payload: Vec<u8> = (10..42).collect();
403        writer.send_frame(&payload).unwrap();
404
405        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
406        match event {
407            Event::Frame { data, .. } => {
408                assert_eq!(data, payload);
409            }
410            other => panic!("expected Frame, got {:?}", other),
411        }
412    }
413
414    #[test]
415    fn pipe_subprocess_exit() {
416        // Use a command that exits immediately
417        let (tx, rx) = crate::event::channel();
418        let config = PipeConfig {
419            name: "test-pipe-exit".into(),
420            command: "true".into(),                 // exits immediately with 0
421            respawn_delay: Duration::from_secs(60), // long delay so we catch InterfaceDown
422            interface_id: InterfaceId(102),
423            runtime: Arc::new(Mutex::new(PipeRuntime {
424                respawn_delay: Duration::from_secs(60),
425            })),
426        };
427
428        let _writer = start(config, tx).unwrap();
429
430        // Should get InterfaceUp then InterfaceDown
431        let mut got_down = false;
432        for _ in 0..5 {
433            match rx.recv_timeout(Duration::from_secs(2)) {
434                Ok(Event::InterfaceDown(InterfaceId(102))) => {
435                    got_down = true;
436                    break;
437                }
438                Ok(_) => continue,
439                Err(_) => break,
440            }
441        }
442        assert!(
443            got_down,
444            "should receive InterfaceDown after subprocess exits"
445        );
446    }
447
448    #[test]
449    fn pipe_config_defaults() {
450        let config = PipeConfig::default();
451        assert_eq!(config.respawn_delay, Duration::from_secs(5));
452        assert_eq!(config.interface_id, InterfaceId(0));
453        assert!(config.command.is_empty());
454    }
455
456    #[test]
457    fn pipe_invalid_command() {
458        let (tx, _rx) = crate::event::channel();
459        let config = PipeConfig {
460            name: "test-pipe-bad".into(),
461            command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
462            respawn_delay: Duration::from_secs(60),
463            interface_id: InterfaceId(103),
464            runtime: Arc::new(Mutex::new(PipeRuntime {
465                respawn_delay: Duration::from_secs(60),
466            })),
467        };
468
469        // sh -c <nonexistent> will start sh successfully but the child exits immediately
470        // For a truly invalid binary we need to check that the process fails
471        // Actually, sh -c "<nonexistent>" will still spawn sh successfully
472        // Let's test with a different approach: verify it doesn't panic
473        let result = start(config, tx);
474        // sh will spawn successfully even if the inner command fails
475        // The real test is that it doesn't panic
476        assert!(result.is_ok());
477    }
478}