1use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
17use crate::event::{Event, EventSender};
18use crate::hdlc;
19use crate::interface::{lock_or_recover, Writer};
20
21#[derive(Debug, Clone)]
23pub struct PipeConfig {
24 pub name: String,
25 pub command: String,
26 pub respawn_delay: Duration,
27 pub interface_id: InterfaceId,
28 pub runtime: Arc<Mutex<PipeRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct PipeRuntime {
33 pub respawn_delay: Duration,
34}
35
36impl PipeRuntime {
37 pub fn from_config(config: &PipeConfig) -> Self {
38 Self {
39 respawn_delay: config.respawn_delay,
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
45pub struct PipeRuntimeConfigHandle {
46 pub interface_name: String,
47 pub runtime: Arc<Mutex<PipeRuntime>>,
48 pub startup: PipeRuntime,
49}
50
51impl Default for PipeConfig {
52 fn default() -> Self {
53 let mut config = PipeConfig {
54 name: String::new(),
55 command: String::new(),
56 respawn_delay: Duration::from_secs(5),
57 interface_id: InterfaceId(0),
58 runtime: Arc::new(Mutex::new(PipeRuntime {
59 respawn_delay: Duration::from_secs(5),
60 })),
61 };
62 let startup = PipeRuntime::from_config(&config);
63 config.runtime = Arc::new(Mutex::new(startup));
64 config
65 }
66}
67
68struct PipeWriter {
70 stdin: std::process::ChildStdin,
71}
72
73impl Writer for PipeWriter {
74 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
75 self.stdin.write_all(&hdlc::frame(data))
76 }
77}
78
79pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
81 let id = config.interface_id;
82 {
83 let startup = PipeRuntime::from_config(&config);
84 *lock_or_recover(&config.runtime, "pipe runtime") = startup;
85 }
86
87 let mut child = spawn_child(&config.command)?;
88
89 let stdout = child
90 .stdout
91 .take()
92 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
93 let stdin = child
94 .stdin
95 .take()
96 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
97
98 log::info!(
99 "[{}] pipe interface started: {}",
100 config.name,
101 config.command
102 );
103
104 let _ = tx.send(Event::InterfaceUp(id, None, None));
106
107 thread::Builder::new()
109 .name(format!("pipe-reader-{}", id.0))
110 .spawn(move || {
111 reader_loop(stdout, child, id, config, tx);
112 })?;
113
114 Ok(Box::new(PipeWriter { stdin }))
115}
116
117fn spawn_child(command: &str) -> io::Result<std::process::Child> {
118 Command::new("sh")
119 .args(["-c", command])
120 .stdin(Stdio::piped())
121 .stdout(Stdio::piped())
122 .stderr(Stdio::null())
123 .spawn()
124}
125
126fn reader_loop(
129 mut stdout: std::process::ChildStdout,
130 mut child: std::process::Child,
131 id: InterfaceId,
132 config: PipeConfig,
133 tx: EventSender,
134) {
135 let mut decoder = hdlc::Decoder::new();
136 let mut buf = [0u8; 4096];
137
138 loop {
139 match stdout.read(&mut buf) {
140 Ok(0) => {
141 let _ = child.wait();
143 log::warn!("[{}] subprocess terminated", config.name);
144 let _ = tx.send(Event::InterfaceDown(id));
145 match respawn(&config, &tx) {
146 Some((new_stdout, new_child)) => {
147 stdout = new_stdout;
148 child = new_child;
149 decoder = hdlc::Decoder::new();
150 continue;
151 }
152 None => return,
153 }
154 }
155 Ok(n) => {
156 for frame in decoder.feed(&buf[..n]) {
157 if tx
158 .send(Event::Frame {
159 interface_id: id,
160 data: frame,
161 rssi: None,
162 snr: None,
163 })
164 .is_err()
165 {
166 let _ = child.kill();
168 return;
169 }
170 }
171 }
172 Err(e) => {
173 log::warn!("[{}] pipe read error: {}", config.name, e);
174 let _ = child.kill();
175 let _ = child.wait();
176 let _ = tx.send(Event::InterfaceDown(id));
177 match respawn(&config, &tx) {
178 Some((new_stdout, new_child)) => {
179 stdout = new_stdout;
180 child = new_child;
181 decoder = hdlc::Decoder::new();
182 continue;
183 }
184 None => return,
185 }
186 }
187 }
188 }
189}
190
191fn respawn(
193 config: &PipeConfig,
194 tx: &EventSender,
195) -> Option<(std::process::ChildStdout, std::process::Child)> {
196 loop {
197 let respawn_delay = lock_or_recover(&config.runtime, "pipe runtime").respawn_delay;
198 thread::sleep(respawn_delay);
199 log::info!(
200 "[{}] attempting to respawn subprocess: {}",
201 config.name,
202 config.command
203 );
204
205 match spawn_child(&config.command) {
206 Ok(mut child) => {
207 let stdout = match child.stdout.take() {
208 Some(s) => s,
209 None => {
210 let _ = child.kill();
211 let _ = child.wait();
212 continue;
213 }
214 };
215 let stdin = match child.stdin.take() {
216 Some(s) => s,
217 None => {
218 let _ = child.kill();
219 let _ = child.wait();
220 continue;
221 }
222 };
223
224 let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
225 if tx
226 .send(Event::InterfaceUp(
227 config.interface_id,
228 Some(new_writer),
229 None,
230 ))
231 .is_err()
232 {
233 return None; }
235 log::info!("[{}] subprocess respawned", config.name);
236 return Some((stdout, child));
237 }
238 Err(e) => {
239 log::warn!("[{}] respawn failed: {}", config.name, e);
240 }
241 }
242 }
243}
244
245pub struct PipeFactory;
247
248impl InterfaceFactory for PipeFactory {
249 fn type_name(&self) -> &str {
250 "PipeInterface"
251 }
252
253 fn parse_config(
254 &self,
255 name: &str,
256 id: InterfaceId,
257 params: &HashMap<String, String>,
258 ) -> Result<Box<dyn InterfaceConfigData>, String> {
259 let command = params
260 .get("command")
261 .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
262 .clone();
263
264 let respawn_delay = match params.get("respawn_delay") {
265 Some(v) => {
266 let ms: u64 = v
267 .parse()
268 .map_err(|_| format!("invalid respawn_delay: {}", v))?;
269 Duration::from_millis(ms)
270 }
271 None => Duration::from_secs(5),
272 };
273
274 Ok(Box::new(PipeConfig {
275 name: name.to_string(),
276 command,
277 respawn_delay,
278 interface_id: id,
279 runtime: Arc::new(Mutex::new(PipeRuntime { respawn_delay })),
280 }))
281 }
282
283 fn start(
284 &self,
285 config: Box<dyn InterfaceConfigData>,
286 ctx: StartContext,
287 ) -> io::Result<StartResult> {
288 let pipe_config = *config
289 .into_any()
290 .downcast::<PipeConfig>()
291 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
292
293 let id = pipe_config.interface_id;
294 let info = InterfaceInfo {
295 id,
296 name: pipe_config.name.clone(),
297 mode: ctx.mode,
298 out_capable: true,
299 in_capable: true,
300 bitrate: Some(1_000_000),
301 airtime_profile: None,
302 announce_rate_target: None,
303 announce_rate_grace: 0,
304 announce_rate_penalty: 0.0,
305 announce_cap: rns_core::constants::ANNOUNCE_CAP,
306 is_local_client: false,
307 wants_tunnel: false,
308 tunnel_id: None,
309 mtu: rns_core::constants::MTU as u32,
310 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
311 ia_freq: 0.0,
312 ip_freq: 0.0,
313 op_freq: 0.0,
314 op_samples: 0,
315 started: crate::time::now(),
316 };
317
318 let writer = start(pipe_config, ctx.tx)?;
319
320 Ok(StartResult::Simple {
321 id,
322 info,
323 writer,
324 interface_type_name: "PipeInterface".to_string(),
325 })
326 }
327}
328
329pub(crate) fn pipe_runtime_handle_from_config(config: &PipeConfig) -> PipeRuntimeConfigHandle {
330 PipeRuntimeConfigHandle {
331 interface_name: config.name.clone(),
332 runtime: Arc::clone(&config.runtime),
333 startup: PipeRuntime::from_config(config),
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn pipe_start_and_receive() {
343 let (tx, rx) = crate::event::channel();
345 let config = PipeConfig {
346 name: "test-pipe".into(),
347 command: "cat".into(),
348 respawn_delay: Duration::from_secs(1),
349 interface_id: InterfaceId(100),
350 runtime: Arc::new(Mutex::new(PipeRuntime {
351 respawn_delay: Duration::from_secs(1),
352 })),
353 };
354
355 let mut writer = start(config, tx).unwrap();
356
357 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
359 assert!(matches!(
360 event,
361 Event::InterfaceUp(InterfaceId(100), None, None)
362 ));
363
364 let payload: Vec<u8> = (0..32).collect();
366 writer.send_frame(&payload).unwrap();
367
368 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
370 match event {
371 Event::Frame {
372 interface_id,
373 data,
374 rssi: _,
375 snr: _,
376 } => {
377 assert_eq!(interface_id, InterfaceId(100));
378 assert_eq!(data, payload);
379 }
380 other => panic!("expected Frame, got {:?}", other),
381 }
382 }
383
384 #[test]
385 fn pipe_writer_sends() {
386 let (tx, rx) = crate::event::channel();
388 let config = PipeConfig {
389 name: "test-pipe-writer".into(),
390 command: "cat".into(),
391 respawn_delay: Duration::from_secs(1),
392 interface_id: InterfaceId(101),
393 runtime: Arc::new(Mutex::new(PipeRuntime {
394 respawn_delay: Duration::from_secs(1),
395 })),
396 };
397
398 let mut writer = start(config, tx).unwrap();
399 let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let payload: Vec<u8> = (10..42).collect();
403 writer.send_frame(&payload).unwrap();
404
405 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
406 match event {
407 Event::Frame { data, .. } => {
408 assert_eq!(data, payload);
409 }
410 other => panic!("expected Frame, got {:?}", other),
411 }
412 }
413
414 #[test]
415 fn pipe_subprocess_exit() {
416 let (tx, rx) = crate::event::channel();
418 let config = PipeConfig {
419 name: "test-pipe-exit".into(),
420 command: "true".into(), respawn_delay: Duration::from_secs(60), interface_id: InterfaceId(102),
423 runtime: Arc::new(Mutex::new(PipeRuntime {
424 respawn_delay: Duration::from_secs(60),
425 })),
426 };
427
428 let _writer = start(config, tx).unwrap();
429
430 let mut got_down = false;
432 for _ in 0..5 {
433 match rx.recv_timeout(Duration::from_secs(2)) {
434 Ok(Event::InterfaceDown(InterfaceId(102))) => {
435 got_down = true;
436 break;
437 }
438 Ok(_) => continue,
439 Err(_) => break,
440 }
441 }
442 assert!(
443 got_down,
444 "should receive InterfaceDown after subprocess exits"
445 );
446 }
447
448 #[test]
449 fn pipe_config_defaults() {
450 let config = PipeConfig::default();
451 assert_eq!(config.respawn_delay, Duration::from_secs(5));
452 assert_eq!(config.interface_id, InterfaceId(0));
453 assert!(config.command.is_empty());
454 }
455
456 #[test]
457 fn pipe_invalid_command() {
458 let (tx, _rx) = crate::event::channel();
459 let config = PipeConfig {
460 name: "test-pipe-bad".into(),
461 command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
462 respawn_delay: Duration::from_secs(60),
463 interface_id: InterfaceId(103),
464 runtime: Arc::new(Mutex::new(PipeRuntime {
465 respawn_delay: Duration::from_secs(60),
466 })),
467 };
468
469 let result = start(config, tx);
474 assert!(result.is_ok());
477 }
478}