1use crate::prelude::*;
6
7use std::process;
8
9#[derive(Debug)]
10pub struct ChildIo<RW> {
11 rw: RW,
12 child: Arc<Mutex<ChildWrapper>>,
13}
14
15pub type Stdin = ChildIo<process::ChildStdin >;
16pub type Stdout = ChildIo<process::ChildStdout>;
17pub type Pair = (Stdin , Stdout );
18
19#[derive(Debug)]
20struct ChildWrapper {
21 reported: bool,
22 desc: String,
23 child: process::Child,
24}
25
26impl Display for ChildWrapper {
27 #[throws(fmt::Error)]
28 fn fmt(&self, f: &mut fmt::Formatter) {
29 write!(f, "subprocess {} [{}]", &self.desc, self.child.id())?
30 }
31}
32
33impl DebugIdentify for ChildWrapper {
34 #[throws(fmt::Error)]
35 fn debug_identify(&self, f: &mut fmt::Formatter) {
36 write!(f, "ChildWrapper([{}] {})", self.child.id(), &self.desc)?;
37 }
38 #[throws(fmt::Error)]
39 fn debug_identify_type(f: &mut fmt::Formatter) {
40 write!(f, "ChildWrapper")?;
41 }
42}
43
44impl<RW> ChildIo<RW> {
45 fn rw_result(&self, eofblock: bool, r: io::Result<usize>)
46 -> io::Result<usize>
47 {
48 let block = match &r {
49 &Ok(v) if v == 0 => eofblock,
50 &Ok(_) => return r,
51 Err(_) => false,
52 };
53 let status = {
54 let mut child = self.child.lock();
55 let child = &mut child.child;
56 if block { Some(child.wait()?) } else { child.try_wait()? }
57 };
58 match status {
59 None => r,
60 Some(es) if es.success() => r,
61 Some(es) => {
62 let mut child = self.child.lock();
63 child.reported = true;
64 let ae = anyhow!("{}: failed: {}", &child, es);
65 Err(io::Error::new(ErrorKind::Other, ae))
66 },
67 }
68 }
69}
70
71pub fn new_pair(mut input: process::Child, desc: String) -> Pair {
72 let stdin = input.stdin .take().expect("ChildIo::pair, no stdin¬");
73 let stdout = input.stdout.take().expect("ChildIo::pair, no stdout¬");
74 let wrapper = Arc::new(Mutex::new(ChildWrapper {
75 reported: false,
76 desc,
77 child: input,
78 }));
79 (ChildIo { rw: stdin, child: wrapper.clone() },
80 ChildIo { rw: stdout, child: wrapper })
81}
82
83#[throws(io::Error)]
84pub fn run_pair(mut cmd: process::Command, desc: String) -> Pair {
85 cmd.stdin (Stdio::piped());
86 cmd.stdout(Stdio::piped());
87 new_pair(cmd.spawn()?, desc)
88}
89
90impl Drop for ChildWrapper {
91 fn drop(&mut self) {
92 use nix::sys::signal::{self, Signal::*};
93 use nix::unistd::Pid;
94
95 if let Err(e) = (||{
96 let es = match self.child.try_wait().context("wait")? {
97 Some(es) => es,
98 None => {
99 let pid = self.child.id();
100 let pid = pid.try_into()
101 .map_err(|_| anyhow!("pid {:?} out of range!", pid))?;
102 let pid = Pid::from_raw(pid);
103 signal::kill(pid, SIGTERM).context("kill")?;
104 let mut es = self.child.wait().context("wait after kill")?;
105 if es.signal() == Some(SIGTERM as _) {
106 es = process::ExitStatus::from_raw(0);
107 }
108 es
109 },
110 };
111 if ! self.reported && ! es.success()
112 && es.signal() != Some(SIGPIPE as _) {
113 warn!("{} failed: {}", &self, es);
114 }
115 Ok::<_,AE>(())
116 })() {
117 warn!("{} cleanup failed: {}", &self, e);
118 }
119 }
120}
121
122impl<R> Read for ChildIo<R> where R: Read {
123 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
124 let r = self.rw.read(buf);
125 self.rw_result(true,r)
126 }
127}
128
129impl<W> Write for ChildIo<W> where W: Write {
130 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
131 let r = self.rw.write(buf);
132 self.rw_result(false,r)
133 }
134 fn flush(&mut self) -> io::Result<()> {
135 let r = self.rw.flush();
136 self.rw_result(false, r.map(|()|0) ).map(|_|())
137 }
138}
139
140#[cfg(test)]
141#[cfg(not(miri))]
142use crate::capture_warns_warn as warn;
143
144#[cfg(test)]
145#[cfg(not(miri))]
146pub mod test {
147
148 use crate::prelude::*;
149 use super::*;
150
151 pub mod capture_warns {
152 use crate::prelude::*;
153 use std::cell::RefCell;
154
155 thread_local! {
156 pub static WARNINGS: RefCell<Option<Vec<String>>> = RefCell::new(None)
157 }
158
159 #[macro_export]
160 macro_rules! capture_warns_warn {
161 { $fmt:literal $($rhs:tt)* } => {
162 $crate::childio::test::capture_warns::WARNINGS.with(|w| {
163 let mut w = w.borrow_mut();
164 if let Some(ref mut msgs) = *w {
165 let s = format!($fmt $($rhs)*);
166 msgs.push(s);
167 } else {
168 crate::prelude::warn!($fmt $($rhs)*);
169 }
170 });
171 }
172 }
173
174 pub fn run(f: &dyn Fn()) -> Vec<String> {
175 WARNINGS.with(|w| {
176 let mut w =w.borrow_mut();
177 *w = Some(vec![]);
178 });
179 f();
180 WARNINGS.with(|w| {
181 let mut w =w.borrow_mut();
182 mem::take(&mut *w).unwrap()
183 })
184 }
185 }
186
187 #[test]
188 fn t_cat() {
189 let setup = ||{
190 let c = Command::new("cat");
191 run_pair(c, "cat".into()).unwrap()
192 };
193
194 {
195 let (mut w, mut r) = setup();
196 assert_eq!( write!(w, "hi").unwrap(), () );
197 assert_eq!( w.flush() .unwrap(), () );
198 let mut buf = [0;10];
199 assert_eq!( r.read(&mut buf).unwrap(), 2 );
200 assert_eq!(&buf[0..2], b"hi");
201 }
202
203 let w = capture_warns::run(&||{
204 let (_w, _r) = setup();
205 });
206 assert_eq!( &w, &[] as &[String] );
207 }
208
209 static STATUS_1: &str = "exit status: 1";
210
211 fn assert_is_status_1(e: &io::Error) {
212 assert_eq!( e.kind(), ErrorKind::Other );
213 let es = e.to_string();
214 assert!( es.ends_with(STATUS_1), "actually {:?}", es );
215 }
216
217 #[test]
218 fn t_false() {
219 let setup = ||{
220 let c = Command::new("false");
221 run_pair(c, "false".into()).unwrap()
222 };
223
224 let one = | f: &dyn Fn(&mut Stdin, &mut Stdout) -> io::Result<()> |{
225 let (mut w, mut r) = setup();
226
227 let r = f(&mut w, &mut r);
228 let e = r.unwrap_err();
229 assert_is_status_1(&e);
230 };
231
232 one(&|_w, r|{
233 let mut buf = [0;10];
234 dbgc!( r.read(&mut buf).map(|_|()) )
235 });
236
237 let lose_race = |w: &mut Stdin| {
238 w.child.lock().child.wait().unwrap();
239
240 for ms in 0.. {
250 let fd = w.rw.as_raw_fd();
251 match nix::unistd::write(fd, &[0]) {
252 Err(nix::errno::Errno::EPIPE) => break,
253 Ok(x) if x <= 1 => {
254 eprintln!("wait/pipe Linux bug ({}), taking countermeasures!", x);
255 },
256 x => panic!("{:?}", x),
257 }
258 std::thread::sleep(Duration::from_millis(ms));
259 }
260 };
261
262 one(&|w, _r|{
263 lose_race(w);
265 dbgc!( write!(w, "hi") )
266 });
267
268 let w = capture_warns::run(&||{
269 let (mut w, _r) = setup();
270 lose_race(&mut w);
271 });
272 dbgc!(&w);
273 assert_eq!( w.len(), 1 );
274 assert!( w[0].ends_with(STATUS_1) );
275 }
276
277
278 #[test]
279 fn t_like_linux_wtf() {
280 let mut c = Command::new("sh");
281 c.args(&["-ec", "exec >/dev/null; sleep 0.1; exit 1"]);
282 let (_w, mut r) = run_pair(c, "sh".to_owned()).unwrap();
283 let mut buf = [0;10];
284 let e = dbgc!( r.read(&mut buf) ).unwrap_err();
285 assert_is_status_1(&e);
286 }
287}