1use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::thread;
11use std::time::Duration;
12
13use rns_core::transport::types::{InterfaceId, InterfaceInfo};
14
15use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20#[derive(Debug, Clone)]
22pub struct PipeConfig {
23 pub name: String,
24 pub command: String,
25 pub respawn_delay: Duration,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for PipeConfig {
30 fn default() -> Self {
31 PipeConfig {
32 name: String::new(),
33 command: String::new(),
34 respawn_delay: Duration::from_secs(5),
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct PipeWriter {
42 stdin: std::process::ChildStdin,
43}
44
45impl Writer for PipeWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stdin.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
53 let id = config.interface_id;
54
55 let mut child = spawn_child(&config.command)?;
56
57 let stdout = child
58 .stdout
59 .take()
60 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
61 let stdin = child
62 .stdin
63 .take()
64 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
65
66 log::info!(
67 "[{}] pipe interface started: {}",
68 config.name,
69 config.command
70 );
71
72 let _ = tx.send(Event::InterfaceUp(id, None, None));
74
75 thread::Builder::new()
77 .name(format!("pipe-reader-{}", id.0))
78 .spawn(move || {
79 reader_loop(stdout, child, id, config, tx);
80 })?;
81
82 Ok(Box::new(PipeWriter { stdin }))
83}
84
85fn spawn_child(command: &str) -> io::Result<std::process::Child> {
86 Command::new("sh")
87 .args(["-c", command])
88 .stdin(Stdio::piped())
89 .stdout(Stdio::piped())
90 .stderr(Stdio::null())
91 .spawn()
92}
93
94fn reader_loop(
97 mut stdout: std::process::ChildStdout,
98 mut child: std::process::Child,
99 id: InterfaceId,
100 config: PipeConfig,
101 tx: EventSender,
102) {
103 let mut decoder = hdlc::Decoder::new();
104 let mut buf = [0u8; 4096];
105
106 loop {
107 match stdout.read(&mut buf) {
108 Ok(0) => {
109 let _ = child.wait();
111 log::warn!("[{}] subprocess terminated", config.name);
112 let _ = tx.send(Event::InterfaceDown(id));
113 match respawn(&config, &tx) {
114 Some((new_stdout, new_child)) => {
115 stdout = new_stdout;
116 child = new_child;
117 decoder = hdlc::Decoder::new();
118 continue;
119 }
120 None => return,
121 }
122 }
123 Ok(n) => {
124 for frame in decoder.feed(&buf[..n]) {
125 if tx
126 .send(Event::Frame {
127 interface_id: id,
128 data: frame,
129 })
130 .is_err()
131 {
132 let _ = child.kill();
134 return;
135 }
136 }
137 }
138 Err(e) => {
139 log::warn!("[{}] pipe read error: {}", config.name, e);
140 let _ = child.kill();
141 let _ = child.wait();
142 let _ = tx.send(Event::InterfaceDown(id));
143 match respawn(&config, &tx) {
144 Some((new_stdout, new_child)) => {
145 stdout = new_stdout;
146 child = new_child;
147 decoder = hdlc::Decoder::new();
148 continue;
149 }
150 None => return,
151 }
152 }
153 }
154 }
155}
156
157fn respawn(
159 config: &PipeConfig,
160 tx: &EventSender,
161) -> Option<(std::process::ChildStdout, std::process::Child)> {
162 loop {
163 thread::sleep(config.respawn_delay);
164 log::info!(
165 "[{}] attempting to respawn subprocess: {}",
166 config.name,
167 config.command
168 );
169
170 match spawn_child(&config.command) {
171 Ok(mut child) => {
172 let stdout = match child.stdout.take() {
173 Some(s) => s,
174 None => {
175 let _ = child.kill();
176 let _ = child.wait();
177 continue;
178 }
179 };
180 let stdin = match child.stdin.take() {
181 Some(s) => s,
182 None => {
183 let _ = child.kill();
184 let _ = child.wait();
185 continue;
186 }
187 };
188
189 let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
190 if tx
191 .send(Event::InterfaceUp(
192 config.interface_id,
193 Some(new_writer),
194 None,
195 ))
196 .is_err()
197 {
198 return None; }
200 log::info!("[{}] subprocess respawned", config.name);
201 return Some((stdout, child));
202 }
203 Err(e) => {
204 log::warn!("[{}] respawn failed: {}", config.name, e);
205 }
206 }
207 }
208}
209
210pub struct PipeFactory;
212
213impl InterfaceFactory for PipeFactory {
214 fn type_name(&self) -> &str {
215 "PipeInterface"
216 }
217
218 fn parse_config(
219 &self,
220 name: &str,
221 id: InterfaceId,
222 params: &HashMap<String, String>,
223 ) -> Result<Box<dyn InterfaceConfigData>, String> {
224 let command = params
225 .get("command")
226 .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
227 .clone();
228
229 let respawn_delay = match params.get("respawn_delay") {
230 Some(v) => {
231 let ms: u64 = v
232 .parse()
233 .map_err(|_| format!("invalid respawn_delay: {}", v))?;
234 Duration::from_millis(ms)
235 }
236 None => Duration::from_secs(5),
237 };
238
239 Ok(Box::new(PipeConfig {
240 name: name.to_string(),
241 command,
242 respawn_delay,
243 interface_id: id,
244 }))
245 }
246
247 fn start(
248 &self,
249 config: Box<dyn InterfaceConfigData>,
250 ctx: StartContext,
251 ) -> io::Result<StartResult> {
252 let pipe_config = *config
253 .into_any()
254 .downcast::<PipeConfig>()
255 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
256
257 let id = pipe_config.interface_id;
258 let info = InterfaceInfo {
259 id,
260 name: pipe_config.name.clone(),
261 mode: ctx.mode,
262 out_capable: true,
263 in_capable: true,
264 bitrate: Some(1_000_000),
265 announce_rate_target: None,
266 announce_rate_grace: 0,
267 announce_rate_penalty: 0.0,
268 announce_cap: rns_core::constants::ANNOUNCE_CAP,
269 is_local_client: false,
270 wants_tunnel: false,
271 tunnel_id: None,
272 mtu: rns_core::constants::MTU as u32,
273 ingress_control: false,
274 ia_freq: 0.0,
275 started: crate::time::now(),
276 };
277
278 let writer = start(pipe_config, ctx.tx)?;
279
280 Ok(StartResult::Simple {
281 id,
282 info,
283 writer,
284 interface_type_name: "PipeInterface".to_string(),
285 })
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use std::sync::mpsc;
293
294 #[test]
295 fn pipe_start_and_receive() {
296 let (tx, rx) = mpsc::channel();
298 let config = PipeConfig {
299 name: "test-pipe".into(),
300 command: "cat".into(),
301 respawn_delay: Duration::from_secs(1),
302 interface_id: InterfaceId(100),
303 };
304
305 let mut writer = start(config, tx).unwrap();
306
307 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
309 assert!(matches!(
310 event,
311 Event::InterfaceUp(InterfaceId(100), None, None)
312 ));
313
314 let payload: Vec<u8> = (0..32).collect();
316 writer.send_frame(&payload).unwrap();
317
318 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
320 match event {
321 Event::Frame { interface_id, data } => {
322 assert_eq!(interface_id, InterfaceId(100));
323 assert_eq!(data, payload);
324 }
325 other => panic!("expected Frame, got {:?}", other),
326 }
327 }
328
329 #[test]
330 fn pipe_writer_sends() {
331 let (tx, rx) = mpsc::channel();
333 let config = PipeConfig {
334 name: "test-pipe-writer".into(),
335 command: "cat".into(),
336 respawn_delay: Duration::from_secs(1),
337 interface_id: InterfaceId(101),
338 };
339
340 let mut writer = start(config, tx).unwrap();
341 let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let payload: Vec<u8> = (10..42).collect();
345 writer.send_frame(&payload).unwrap();
346
347 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
348 match event {
349 Event::Frame { data, .. } => {
350 assert_eq!(data, payload);
351 }
352 other => panic!("expected Frame, got {:?}", other),
353 }
354 }
355
356 #[test]
357 fn pipe_subprocess_exit() {
358 let (tx, rx) = mpsc::channel();
360 let config = PipeConfig {
361 name: "test-pipe-exit".into(),
362 command: "true".into(), respawn_delay: Duration::from_secs(60), interface_id: InterfaceId(102),
365 };
366
367 let _writer = start(config, tx).unwrap();
368
369 let mut got_down = false;
371 for _ in 0..5 {
372 match rx.recv_timeout(Duration::from_secs(2)) {
373 Ok(Event::InterfaceDown(InterfaceId(102))) => {
374 got_down = true;
375 break;
376 }
377 Ok(_) => continue,
378 Err(_) => break,
379 }
380 }
381 assert!(
382 got_down,
383 "should receive InterfaceDown after subprocess exits"
384 );
385 }
386
387 #[test]
388 fn pipe_config_defaults() {
389 let config = PipeConfig::default();
390 assert_eq!(config.respawn_delay, Duration::from_secs(5));
391 assert_eq!(config.interface_id, InterfaceId(0));
392 assert!(config.command.is_empty());
393 }
394
395 #[test]
396 fn pipe_invalid_command() {
397 let (tx, _rx) = mpsc::channel();
398 let config = PipeConfig {
399 name: "test-pipe-bad".into(),
400 command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
401 respawn_delay: Duration::from_secs(60),
402 interface_id: InterfaceId(103),
403 };
404
405 let result = start(config, tx);
410 assert!(result.is_ok());
413 }
414}