1use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
17use crate::event::{Event, EventSender};
18use crate::hdlc;
19use crate::interface::Writer;
20
21#[derive(Debug, Clone)]
23pub struct PipeConfig {
24 pub name: String,
25 pub command: String,
26 pub respawn_delay: Duration,
27 pub interface_id: InterfaceId,
28 pub runtime: Arc<Mutex<PipeRuntime>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct PipeRuntime {
33 pub respawn_delay: Duration,
34}
35
36impl PipeRuntime {
37 pub fn from_config(config: &PipeConfig) -> Self {
38 Self {
39 respawn_delay: config.respawn_delay,
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
45pub struct PipeRuntimeConfigHandle {
46 pub interface_name: String,
47 pub runtime: Arc<Mutex<PipeRuntime>>,
48 pub startup: PipeRuntime,
49}
50
51impl Default for PipeConfig {
52 fn default() -> Self {
53 let mut config = PipeConfig {
54 name: String::new(),
55 command: String::new(),
56 respawn_delay: Duration::from_secs(5),
57 interface_id: InterfaceId(0),
58 runtime: Arc::new(Mutex::new(PipeRuntime {
59 respawn_delay: Duration::from_secs(5),
60 })),
61 };
62 let startup = PipeRuntime::from_config(&config);
63 config.runtime = Arc::new(Mutex::new(startup));
64 config
65 }
66}
67
68struct PipeWriter {
70 stdin: std::process::ChildStdin,
71}
72
73impl Writer for PipeWriter {
74 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
75 self.stdin.write_all(&hdlc::frame(data))
76 }
77}
78
79pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
81 let id = config.interface_id;
82 {
83 let startup = PipeRuntime::from_config(&config);
84 *config.runtime.lock().unwrap() = startup;
85 }
86
87 let mut child = spawn_child(&config.command)?;
88
89 let stdout = child
90 .stdout
91 .take()
92 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
93 let stdin = child
94 .stdin
95 .take()
96 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
97
98 log::info!(
99 "[{}] pipe interface started: {}",
100 config.name,
101 config.command
102 );
103
104 let _ = tx.send(Event::InterfaceUp(id, None, None));
106
107 thread::Builder::new()
109 .name(format!("pipe-reader-{}", id.0))
110 .spawn(move || {
111 reader_loop(stdout, child, id, config, tx);
112 })?;
113
114 Ok(Box::new(PipeWriter { stdin }))
115}
116
117fn spawn_child(command: &str) -> io::Result<std::process::Child> {
118 Command::new("sh")
119 .args(["-c", command])
120 .stdin(Stdio::piped())
121 .stdout(Stdio::piped())
122 .stderr(Stdio::null())
123 .spawn()
124}
125
126fn reader_loop(
129 mut stdout: std::process::ChildStdout,
130 mut child: std::process::Child,
131 id: InterfaceId,
132 config: PipeConfig,
133 tx: EventSender,
134) {
135 let mut decoder = hdlc::Decoder::new();
136 let mut buf = [0u8; 4096];
137
138 loop {
139 match stdout.read(&mut buf) {
140 Ok(0) => {
141 let _ = child.wait();
143 log::warn!("[{}] subprocess terminated", config.name);
144 let _ = tx.send(Event::InterfaceDown(id));
145 match respawn(&config, &tx) {
146 Some((new_stdout, new_child)) => {
147 stdout = new_stdout;
148 child = new_child;
149 decoder = hdlc::Decoder::new();
150 continue;
151 }
152 None => return,
153 }
154 }
155 Ok(n) => {
156 for frame in decoder.feed(&buf[..n]) {
157 if tx
158 .send(Event::Frame {
159 interface_id: id,
160 data: frame,
161 })
162 .is_err()
163 {
164 let _ = child.kill();
166 return;
167 }
168 }
169 }
170 Err(e) => {
171 log::warn!("[{}] pipe read error: {}", config.name, e);
172 let _ = child.kill();
173 let _ = child.wait();
174 let _ = tx.send(Event::InterfaceDown(id));
175 match respawn(&config, &tx) {
176 Some((new_stdout, new_child)) => {
177 stdout = new_stdout;
178 child = new_child;
179 decoder = hdlc::Decoder::new();
180 continue;
181 }
182 None => return,
183 }
184 }
185 }
186 }
187}
188
189fn respawn(
191 config: &PipeConfig,
192 tx: &EventSender,
193) -> Option<(std::process::ChildStdout, std::process::Child)> {
194 loop {
195 thread::sleep(config.runtime.lock().unwrap().respawn_delay);
196 log::info!(
197 "[{}] attempting to respawn subprocess: {}",
198 config.name,
199 config.command
200 );
201
202 match spawn_child(&config.command) {
203 Ok(mut child) => {
204 let stdout = match child.stdout.take() {
205 Some(s) => s,
206 None => {
207 let _ = child.kill();
208 let _ = child.wait();
209 continue;
210 }
211 };
212 let stdin = match child.stdin.take() {
213 Some(s) => s,
214 None => {
215 let _ = child.kill();
216 let _ = child.wait();
217 continue;
218 }
219 };
220
221 let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
222 if tx
223 .send(Event::InterfaceUp(
224 config.interface_id,
225 Some(new_writer),
226 None,
227 ))
228 .is_err()
229 {
230 return None; }
232 log::info!("[{}] subprocess respawned", config.name);
233 return Some((stdout, child));
234 }
235 Err(e) => {
236 log::warn!("[{}] respawn failed: {}", config.name, e);
237 }
238 }
239 }
240}
241
242pub struct PipeFactory;
244
245impl InterfaceFactory for PipeFactory {
246 fn type_name(&self) -> &str {
247 "PipeInterface"
248 }
249
250 fn parse_config(
251 &self,
252 name: &str,
253 id: InterfaceId,
254 params: &HashMap<String, String>,
255 ) -> Result<Box<dyn InterfaceConfigData>, String> {
256 let command = params
257 .get("command")
258 .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
259 .clone();
260
261 let respawn_delay = match params.get("respawn_delay") {
262 Some(v) => {
263 let ms: u64 = v
264 .parse()
265 .map_err(|_| format!("invalid respawn_delay: {}", v))?;
266 Duration::from_millis(ms)
267 }
268 None => Duration::from_secs(5),
269 };
270
271 Ok(Box::new(PipeConfig {
272 name: name.to_string(),
273 command,
274 respawn_delay,
275 interface_id: id,
276 runtime: Arc::new(Mutex::new(PipeRuntime { respawn_delay })),
277 }))
278 }
279
280 fn start(
281 &self,
282 config: Box<dyn InterfaceConfigData>,
283 ctx: StartContext,
284 ) -> io::Result<StartResult> {
285 let pipe_config = *config
286 .into_any()
287 .downcast::<PipeConfig>()
288 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
289
290 let id = pipe_config.interface_id;
291 let info = InterfaceInfo {
292 id,
293 name: pipe_config.name.clone(),
294 mode: ctx.mode,
295 out_capable: true,
296 in_capable: true,
297 bitrate: Some(1_000_000),
298 announce_rate_target: None,
299 announce_rate_grace: 0,
300 announce_rate_penalty: 0.0,
301 announce_cap: rns_core::constants::ANNOUNCE_CAP,
302 is_local_client: false,
303 wants_tunnel: false,
304 tunnel_id: None,
305 mtu: rns_core::constants::MTU as u32,
306 ingress_control: false,
307 ia_freq: 0.0,
308 started: crate::time::now(),
309 };
310
311 let writer = start(pipe_config, ctx.tx)?;
312
313 Ok(StartResult::Simple {
314 id,
315 info,
316 writer,
317 interface_type_name: "PipeInterface".to_string(),
318 })
319 }
320}
321
322pub(crate) fn pipe_runtime_handle_from_config(config: &PipeConfig) -> PipeRuntimeConfigHandle {
323 PipeRuntimeConfigHandle {
324 interface_name: config.name.clone(),
325 runtime: Arc::clone(&config.runtime),
326 startup: PipeRuntime::from_config(config),
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn pipe_start_and_receive() {
336 let (tx, rx) = crate::event::channel();
338 let config = PipeConfig {
339 name: "test-pipe".into(),
340 command: "cat".into(),
341 respawn_delay: Duration::from_secs(1),
342 interface_id: InterfaceId(100),
343 runtime: Arc::new(Mutex::new(PipeRuntime {
344 respawn_delay: Duration::from_secs(1),
345 })),
346 };
347
348 let mut writer = start(config, tx).unwrap();
349
350 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
352 assert!(matches!(
353 event,
354 Event::InterfaceUp(InterfaceId(100), None, None)
355 ));
356
357 let payload: Vec<u8> = (0..32).collect();
359 writer.send_frame(&payload).unwrap();
360
361 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
363 match event {
364 Event::Frame { interface_id, data } => {
365 assert_eq!(interface_id, InterfaceId(100));
366 assert_eq!(data, payload);
367 }
368 other => panic!("expected Frame, got {:?}", other),
369 }
370 }
371
372 #[test]
373 fn pipe_writer_sends() {
374 let (tx, rx) = crate::event::channel();
376 let config = PipeConfig {
377 name: "test-pipe-writer".into(),
378 command: "cat".into(),
379 respawn_delay: Duration::from_secs(1),
380 interface_id: InterfaceId(101),
381 runtime: Arc::new(Mutex::new(PipeRuntime {
382 respawn_delay: Duration::from_secs(1),
383 })),
384 };
385
386 let mut writer = start(config, tx).unwrap();
387 let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let payload: Vec<u8> = (10..42).collect();
391 writer.send_frame(&payload).unwrap();
392
393 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
394 match event {
395 Event::Frame { data, .. } => {
396 assert_eq!(data, payload);
397 }
398 other => panic!("expected Frame, got {:?}", other),
399 }
400 }
401
402 #[test]
403 fn pipe_subprocess_exit() {
404 let (tx, rx) = crate::event::channel();
406 let config = PipeConfig {
407 name: "test-pipe-exit".into(),
408 command: "true".into(), respawn_delay: Duration::from_secs(60), interface_id: InterfaceId(102),
411 runtime: Arc::new(Mutex::new(PipeRuntime {
412 respawn_delay: Duration::from_secs(60),
413 })),
414 };
415
416 let _writer = start(config, tx).unwrap();
417
418 let mut got_down = false;
420 for _ in 0..5 {
421 match rx.recv_timeout(Duration::from_secs(2)) {
422 Ok(Event::InterfaceDown(InterfaceId(102))) => {
423 got_down = true;
424 break;
425 }
426 Ok(_) => continue,
427 Err(_) => break,
428 }
429 }
430 assert!(
431 got_down,
432 "should receive InterfaceDown after subprocess exits"
433 );
434 }
435
436 #[test]
437 fn pipe_config_defaults() {
438 let config = PipeConfig::default();
439 assert_eq!(config.respawn_delay, Duration::from_secs(5));
440 assert_eq!(config.interface_id, InterfaceId(0));
441 assert!(config.command.is_empty());
442 }
443
444 #[test]
445 fn pipe_invalid_command() {
446 let (tx, _rx) = crate::event::channel();
447 let config = PipeConfig {
448 name: "test-pipe-bad".into(),
449 command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
450 respawn_delay: Duration::from_secs(60),
451 interface_id: InterfaceId(103),
452 runtime: Arc::new(Mutex::new(PipeRuntime {
453 respawn_delay: Duration::from_secs(60),
454 })),
455 };
456
457 let result = start(config, tx);
462 assert!(result.is_ok());
465 }
466}