1use std::collections::HashMap;
8use std::io::{self, Read, Write};
9use std::process::{Command, Stdio};
10use std::thread;
11use std::time::Duration;
12
13use rns_core::transport::types::{InterfaceId, InterfaceInfo};
14
15use crate::event::{Event, EventSender};
16use crate::hdlc;
17use crate::interface::Writer;
18use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
19
20#[derive(Debug, Clone)]
22pub struct PipeConfig {
23 pub name: String,
24 pub command: String,
25 pub respawn_delay: Duration,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for PipeConfig {
30 fn default() -> Self {
31 PipeConfig {
32 name: String::new(),
33 command: String::new(),
34 respawn_delay: Duration::from_secs(5),
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct PipeWriter {
42 stdin: std::process::ChildStdin,
43}
44
45impl Writer for PipeWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stdin.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
53 let id = config.interface_id;
54
55 let mut child = spawn_child(&config.command)?;
56
57 let stdout = child
58 .stdout
59 .take()
60 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
61 let stdin = child
62 .stdin
63 .take()
64 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
65
66 log::info!("[{}] pipe interface started: {}", config.name, config.command);
67
68 let _ = tx.send(Event::InterfaceUp(id, None, None));
70
71 thread::Builder::new()
73 .name(format!("pipe-reader-{}", id.0))
74 .spawn(move || {
75 reader_loop(stdout, child, id, config, tx);
76 })?;
77
78 Ok(Box::new(PipeWriter { stdin }))
79}
80
81fn spawn_child(command: &str) -> io::Result<std::process::Child> {
82 Command::new("sh")
83 .args(["-c", command])
84 .stdin(Stdio::piped())
85 .stdout(Stdio::piped())
86 .stderr(Stdio::null())
87 .spawn()
88}
89
90fn reader_loop(
93 mut stdout: std::process::ChildStdout,
94 mut child: std::process::Child,
95 id: InterfaceId,
96 config: PipeConfig,
97 tx: EventSender,
98) {
99 let mut decoder = hdlc::Decoder::new();
100 let mut buf = [0u8; 4096];
101
102 loop {
103 match stdout.read(&mut buf) {
104 Ok(0) => {
105 let _ = child.wait();
107 log::warn!("[{}] subprocess terminated", config.name);
108 let _ = tx.send(Event::InterfaceDown(id));
109 match respawn(&config, &tx) {
110 Some((new_stdout, new_child)) => {
111 stdout = new_stdout;
112 child = new_child;
113 decoder = hdlc::Decoder::new();
114 continue;
115 }
116 None => return,
117 }
118 }
119 Ok(n) => {
120 for frame in decoder.feed(&buf[..n]) {
121 if tx
122 .send(Event::Frame {
123 interface_id: id,
124 data: frame,
125 })
126 .is_err()
127 {
128 let _ = child.kill();
130 return;
131 }
132 }
133 }
134 Err(e) => {
135 log::warn!("[{}] pipe read error: {}", config.name, e);
136 let _ = child.kill();
137 let _ = child.wait();
138 let _ = tx.send(Event::InterfaceDown(id));
139 match respawn(&config, &tx) {
140 Some((new_stdout, new_child)) => {
141 stdout = new_stdout;
142 child = new_child;
143 decoder = hdlc::Decoder::new();
144 continue;
145 }
146 None => return,
147 }
148 }
149 }
150 }
151}
152
153fn respawn(
155 config: &PipeConfig,
156 tx: &EventSender,
157) -> Option<(std::process::ChildStdout, std::process::Child)> {
158 loop {
159 thread::sleep(config.respawn_delay);
160 log::info!(
161 "[{}] attempting to respawn subprocess: {}",
162 config.name,
163 config.command
164 );
165
166 match spawn_child(&config.command) {
167 Ok(mut child) => {
168 let stdout = match child.stdout.take() {
169 Some(s) => s,
170 None => {
171 let _ = child.kill();
172 let _ = child.wait();
173 continue;
174 }
175 };
176 let stdin = match child.stdin.take() {
177 Some(s) => s,
178 None => {
179 let _ = child.kill();
180 let _ = child.wait();
181 continue;
182 }
183 };
184
185 let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
186 if tx
187 .send(Event::InterfaceUp(config.interface_id, Some(new_writer), None))
188 .is_err()
189 {
190 return None; }
192 log::info!("[{}] subprocess respawned", config.name);
193 return Some((stdout, child));
194 }
195 Err(e) => {
196 log::warn!("[{}] respawn failed: {}", config.name, e);
197 }
198 }
199 }
200}
201
202pub struct PipeFactory;
204
205impl InterfaceFactory for PipeFactory {
206 fn type_name(&self) -> &str {
207 "PipeInterface"
208 }
209
210 fn parse_config(
211 &self,
212 name: &str,
213 id: InterfaceId,
214 params: &HashMap<String, String>,
215 ) -> Result<Box<dyn InterfaceConfigData>, String> {
216 let command = params
217 .get("command")
218 .ok_or_else(|| "PipeInterface requires 'command'".to_string())?
219 .clone();
220
221 let respawn_delay = match params.get("respawn_delay") {
222 Some(v) => {
223 let ms: u64 = v
224 .parse()
225 .map_err(|_| format!("invalid respawn_delay: {}", v))?;
226 Duration::from_millis(ms)
227 }
228 None => Duration::from_secs(5),
229 };
230
231 Ok(Box::new(PipeConfig {
232 name: name.to_string(),
233 command,
234 respawn_delay,
235 interface_id: id,
236 }))
237 }
238
239 fn start(
240 &self,
241 config: Box<dyn InterfaceConfigData>,
242 ctx: StartContext,
243 ) -> io::Result<StartResult> {
244 let pipe_config = *config
245 .into_any()
246 .downcast::<PipeConfig>()
247 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
248
249 let id = pipe_config.interface_id;
250 let info = InterfaceInfo {
251 id,
252 name: pipe_config.name.clone(),
253 mode: ctx.mode,
254 out_capable: true,
255 in_capable: true,
256 bitrate: Some(1_000_000),
257 announce_rate_target: None,
258 announce_rate_grace: 0,
259 announce_rate_penalty: 0.0,
260 announce_cap: rns_core::constants::ANNOUNCE_CAP,
261 is_local_client: false,
262 wants_tunnel: false,
263 tunnel_id: None,
264 mtu: rns_core::constants::MTU as u32,
265 ingress_control: false,
266 ia_freq: 0.0,
267 started: crate::time::now(),
268 };
269
270 let writer = start(pipe_config, ctx.tx)?;
271
272 Ok(StartResult::Simple {
273 id,
274 info,
275 writer,
276 interface_type_name: "PipeInterface".to_string(),
277 })
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use std::sync::mpsc;
285
286 #[test]
287 fn pipe_start_and_receive() {
288 let (tx, rx) = mpsc::channel();
290 let config = PipeConfig {
291 name: "test-pipe".into(),
292 command: "cat".into(),
293 respawn_delay: Duration::from_secs(1),
294 interface_id: InterfaceId(100),
295 };
296
297 let mut writer = start(config, tx).unwrap();
298
299 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
301 assert!(matches!(event, Event::InterfaceUp(InterfaceId(100), None, None)));
302
303 let payload: Vec<u8> = (0..32).collect();
305 writer.send_frame(&payload).unwrap();
306
307 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
309 match event {
310 Event::Frame { interface_id, data } => {
311 assert_eq!(interface_id, InterfaceId(100));
312 assert_eq!(data, payload);
313 }
314 other => panic!("expected Frame, got {:?}", other),
315 }
316 }
317
318 #[test]
319 fn pipe_writer_sends() {
320 let (tx, rx) = mpsc::channel();
322 let config = PipeConfig {
323 name: "test-pipe-writer".into(),
324 command: "cat".into(),
325 respawn_delay: Duration::from_secs(1),
326 interface_id: InterfaceId(101),
327 };
328
329 let mut writer = start(config, tx).unwrap();
330 let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let payload: Vec<u8> = (10..42).collect();
334 writer.send_frame(&payload).unwrap();
335
336 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
337 match event {
338 Event::Frame { data, .. } => {
339 assert_eq!(data, payload);
340 }
341 other => panic!("expected Frame, got {:?}", other),
342 }
343 }
344
345 #[test]
346 fn pipe_subprocess_exit() {
347 let (tx, rx) = mpsc::channel();
349 let config = PipeConfig {
350 name: "test-pipe-exit".into(),
351 command: "true".into(), respawn_delay: Duration::from_secs(60), interface_id: InterfaceId(102),
354 };
355
356 let _writer = start(config, tx).unwrap();
357
358 let mut got_down = false;
360 for _ in 0..5 {
361 match rx.recv_timeout(Duration::from_secs(2)) {
362 Ok(Event::InterfaceDown(InterfaceId(102))) => {
363 got_down = true;
364 break;
365 }
366 Ok(_) => continue,
367 Err(_) => break,
368 }
369 }
370 assert!(got_down, "should receive InterfaceDown after subprocess exits");
371 }
372
373 #[test]
374 fn pipe_config_defaults() {
375 let config = PipeConfig::default();
376 assert_eq!(config.respawn_delay, Duration::from_secs(5));
377 assert_eq!(config.interface_id, InterfaceId(0));
378 assert!(config.command.is_empty());
379 }
380
381 #[test]
382 fn pipe_invalid_command() {
383 let (tx, _rx) = mpsc::channel();
384 let config = PipeConfig {
385 name: "test-pipe-bad".into(),
386 command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
387 respawn_delay: Duration::from_secs(60),
388 interface_id: InterfaceId(103),
389 };
390
391 let result = start(config, tx);
396 assert!(result.is_ok());
399 }
400}