1use std::io::{self, Read, Write};
8use std::process::{Command, Stdio};
9use std::thread;
10use std::time::Duration;
11
12use rns_core::transport::types::InterfaceId;
13
14use crate::event::{Event, EventSender};
15use crate::hdlc;
16use crate::interface::Writer;
17
18#[derive(Debug, Clone)]
20pub struct PipeConfig {
21 pub name: String,
22 pub command: String,
23 pub respawn_delay: Duration,
24 pub interface_id: InterfaceId,
25}
26
27impl Default for PipeConfig {
28 fn default() -> Self {
29 PipeConfig {
30 name: String::new(),
31 command: String::new(),
32 respawn_delay: Duration::from_secs(5),
33 interface_id: InterfaceId(0),
34 }
35 }
36}
37
38struct PipeWriter {
40 stdin: std::process::ChildStdin,
41}
42
43impl Writer for PipeWriter {
44 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
45 self.stdin.write_all(&hdlc::frame(data))
46 }
47}
48
49pub fn start(config: PipeConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
51 let id = config.interface_id;
52
53 let mut child = spawn_child(&config.command)?;
54
55 let stdout = child
56 .stdout
57 .take()
58 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdout from child"))?;
59 let stdin = child
60 .stdin
61 .take()
62 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no stdin from child"))?;
63
64 log::info!("[{}] pipe interface started: {}", config.name, config.command);
65
66 let _ = tx.send(Event::InterfaceUp(id, None, None));
68
69 thread::Builder::new()
71 .name(format!("pipe-reader-{}", id.0))
72 .spawn(move || {
73 reader_loop(stdout, child, id, config, tx);
74 })?;
75
76 Ok(Box::new(PipeWriter { stdin }))
77}
78
79fn spawn_child(command: &str) -> io::Result<std::process::Child> {
80 Command::new("sh")
81 .args(["-c", command])
82 .stdin(Stdio::piped())
83 .stdout(Stdio::piped())
84 .stderr(Stdio::null())
85 .spawn()
86}
87
88fn reader_loop(
91 mut stdout: std::process::ChildStdout,
92 mut child: std::process::Child,
93 id: InterfaceId,
94 config: PipeConfig,
95 tx: EventSender,
96) {
97 let mut decoder = hdlc::Decoder::new();
98 let mut buf = [0u8; 4096];
99
100 loop {
101 match stdout.read(&mut buf) {
102 Ok(0) => {
103 let _ = child.wait();
105 log::warn!("[{}] subprocess terminated", config.name);
106 let _ = tx.send(Event::InterfaceDown(id));
107 match respawn(&config, &tx) {
108 Some((new_stdout, new_child)) => {
109 stdout = new_stdout;
110 child = new_child;
111 decoder = hdlc::Decoder::new();
112 continue;
113 }
114 None => return,
115 }
116 }
117 Ok(n) => {
118 for frame in decoder.feed(&buf[..n]) {
119 if tx
120 .send(Event::Frame {
121 interface_id: id,
122 data: frame,
123 })
124 .is_err()
125 {
126 let _ = child.kill();
128 return;
129 }
130 }
131 }
132 Err(e) => {
133 log::warn!("[{}] pipe read error: {}", config.name, e);
134 let _ = child.kill();
135 let _ = child.wait();
136 let _ = tx.send(Event::InterfaceDown(id));
137 match respawn(&config, &tx) {
138 Some((new_stdout, new_child)) => {
139 stdout = new_stdout;
140 child = new_child;
141 decoder = hdlc::Decoder::new();
142 continue;
143 }
144 None => return,
145 }
146 }
147 }
148 }
149}
150
151fn respawn(
153 config: &PipeConfig,
154 tx: &EventSender,
155) -> Option<(std::process::ChildStdout, std::process::Child)> {
156 loop {
157 thread::sleep(config.respawn_delay);
158 log::info!(
159 "[{}] attempting to respawn subprocess: {}",
160 config.name,
161 config.command
162 );
163
164 match spawn_child(&config.command) {
165 Ok(mut child) => {
166 let stdout = match child.stdout.take() {
167 Some(s) => s,
168 None => {
169 let _ = child.kill();
170 let _ = child.wait();
171 continue;
172 }
173 };
174 let stdin = match child.stdin.take() {
175 Some(s) => s,
176 None => {
177 let _ = child.kill();
178 let _ = child.wait();
179 continue;
180 }
181 };
182
183 let new_writer: Box<dyn Writer> = Box::new(PipeWriter { stdin });
184 if tx
185 .send(Event::InterfaceUp(config.interface_id, Some(new_writer), None))
186 .is_err()
187 {
188 return None; }
190 log::info!("[{}] subprocess respawned", config.name);
191 return Some((stdout, child));
192 }
193 Err(e) => {
194 log::warn!("[{}] respawn failed: {}", config.name, e);
195 }
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use std::sync::mpsc;
204
205 #[test]
206 fn pipe_start_and_receive() {
207 let (tx, rx) = mpsc::channel();
209 let config = PipeConfig {
210 name: "test-pipe".into(),
211 command: "cat".into(),
212 respawn_delay: Duration::from_secs(1),
213 interface_id: InterfaceId(100),
214 };
215
216 let mut writer = start(config, tx).unwrap();
217
218 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
220 assert!(matches!(event, Event::InterfaceUp(InterfaceId(100), None, None)));
221
222 let payload: Vec<u8> = (0..32).collect();
224 writer.send_frame(&payload).unwrap();
225
226 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
228 match event {
229 Event::Frame { interface_id, data } => {
230 assert_eq!(interface_id, InterfaceId(100));
231 assert_eq!(data, payload);
232 }
233 other => panic!("expected Frame, got {:?}", other),
234 }
235 }
236
237 #[test]
238 fn pipe_writer_sends() {
239 let (tx, rx) = mpsc::channel();
241 let config = PipeConfig {
242 name: "test-pipe-writer".into(),
243 command: "cat".into(),
244 respawn_delay: Duration::from_secs(1),
245 interface_id: InterfaceId(101),
246 };
247
248 let mut writer = start(config, tx).unwrap();
249 let _ = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let payload: Vec<u8> = (10..42).collect();
253 writer.send_frame(&payload).unwrap();
254
255 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
256 match event {
257 Event::Frame { data, .. } => {
258 assert_eq!(data, payload);
259 }
260 other => panic!("expected Frame, got {:?}", other),
261 }
262 }
263
264 #[test]
265 fn pipe_subprocess_exit() {
266 let (tx, rx) = mpsc::channel();
268 let config = PipeConfig {
269 name: "test-pipe-exit".into(),
270 command: "true".into(), respawn_delay: Duration::from_secs(60), interface_id: InterfaceId(102),
273 };
274
275 let _writer = start(config, tx).unwrap();
276
277 let mut got_down = false;
279 for _ in 0..5 {
280 match rx.recv_timeout(Duration::from_secs(2)) {
281 Ok(Event::InterfaceDown(InterfaceId(102))) => {
282 got_down = true;
283 break;
284 }
285 Ok(_) => continue,
286 Err(_) => break,
287 }
288 }
289 assert!(got_down, "should receive InterfaceDown after subprocess exits");
290 }
291
292 #[test]
293 fn pipe_config_defaults() {
294 let config = PipeConfig::default();
295 assert_eq!(config.respawn_delay, Duration::from_secs(5));
296 assert_eq!(config.interface_id, InterfaceId(0));
297 assert!(config.command.is_empty());
298 }
299
300 #[test]
301 fn pipe_invalid_command() {
302 let (tx, _rx) = mpsc::channel();
303 let config = PipeConfig {
304 name: "test-pipe-bad".into(),
305 command: "/nonexistent_rns_test_binary_that_does_not_exist_xyz".into(),
306 respawn_delay: Duration::from_secs(60),
307 interface_id: InterfaceId(103),
308 };
309
310 let result = start(config, tx);
315 assert!(result.is_ok());
318 }
319}