Skip to main content

otter_support/
childio.rs

1// Copyright 2020-2021 Ian Jackson and contributors to Otter
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// There is NO WARRANTY.
4
5use crate::prelude::*;
6
7use std::process;
8
9#[derive(Debug)]
10pub struct ChildIo<RW> {
11  rw: RW,
12  child: Arc<Mutex<ChildWrapper>>,
13}
14
15pub type Stdin  = ChildIo<process::ChildStdin >;
16pub type Stdout = ChildIo<process::ChildStdout>;
17pub type Pair   = (Stdin /*w*/, Stdout /*r*/);
18
19#[derive(Debug)]
20struct ChildWrapper {
21  reported: bool,
22  desc: String,
23  child: process::Child,
24}
25
26impl Display for ChildWrapper {
27  #[throws(fmt::Error)]
28  fn fmt(&self, f: &mut fmt::Formatter) {
29    write!(f, "subprocess {} [{}]", &self.desc, self.child.id())?
30  }
31}
32
33impl DebugIdentify for ChildWrapper {
34  #[throws(fmt::Error)]
35  fn debug_identify(&self, f: &mut fmt::Formatter) {
36    write!(f, "ChildWrapper([{}] {})", self.child.id(), &self.desc)?;
37  }
38  #[throws(fmt::Error)]
39  fn debug_identify_type(f: &mut fmt::Formatter) {
40    write!(f, "ChildWrapper")?;
41  }
42}
43
44impl<RW> ChildIo<RW> {
45  fn rw_result(&self, eofblock: bool, r: io::Result<usize>)
46               -> io::Result<usize>
47  {
48    let block = match &r {
49      &Ok(v) if v == 0 => eofblock,
50      &Ok(_)           => return r,
51      Err(_)           => false,
52    };
53    let status = {
54      let mut child = self.child.lock();
55      let child = &mut child.child;
56      if block { Some(child.wait()?) } else { child.try_wait()? }
57    };
58    match status {
59      None => r,
60      Some(es) if es.success() => r,
61      Some(es) => {
62        let mut child = self.child.lock();
63        child.reported = true;
64        let ae = anyhow!("{}: failed: {}", &child, es);
65        Err(io::Error::new(ErrorKind::Other, ae))
66      },
67    }
68  }
69}
70
71pub fn new_pair(mut input: process::Child, desc: String) -> Pair {
72  let stdin  = input.stdin .take().expect("ChildIo::pair, no stdin¬");
73  let stdout = input.stdout.take().expect("ChildIo::pair, no stdout¬");
74  let wrapper = Arc::new(Mutex::new(ChildWrapper {
75    reported: false,
76    desc,
77    child: input,
78  }));
79  (ChildIo { rw: stdin,  child: wrapper.clone() },
80   ChildIo { rw: stdout, child: wrapper         })
81}
82
83#[throws(io::Error)]
84pub fn run_pair(mut cmd: process::Command, desc: String) -> Pair {
85  cmd.stdin (Stdio::piped());
86  cmd.stdout(Stdio::piped());
87  new_pair(cmd.spawn()?, desc)
88}
89
90impl Drop for ChildWrapper {
91  fn drop(&mut self) {
92    use nix::sys::signal::{self, Signal::*};
93    use nix::unistd::Pid;
94
95    if let Err(e) = (||{
96      let es = match self.child.try_wait().context("wait")? {
97        Some(es) => es,
98        None => {
99          let pid = self.child.id();
100          let pid = pid.try_into()
101            .map_err(|_| anyhow!("pid {:?} out of range!", pid))?;
102          let pid = Pid::from_raw(pid);
103          signal::kill(pid, SIGTERM).context("kill")?;
104          let mut es = self.child.wait().context("wait after kill")?;
105          if es.signal() == Some(SIGTERM as _) {
106            es = process::ExitStatus::from_raw(0);
107          }
108          es
109        },
110      };
111      if ! self.reported && ! es.success()
112      && es.signal() != Some(SIGPIPE as _) {
113        warn!("{} failed: {}", &self, es);
114      }
115      Ok::<_,AE>(())
116    })() {
117      warn!("{} cleanup failed: {}", &self, e);
118    }
119  }
120}
121
122impl<R> Read for ChildIo<R> where R: Read {
123  fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
124    let r = self.rw.read(buf);
125    self.rw_result(true,r)
126  }
127}
128
129impl<W> Write for ChildIo<W> where W: Write {
130  fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
131    let r = self.rw.write(buf);
132    self.rw_result(false,r)
133  }
134  fn flush(&mut self) -> io::Result<()> {
135    let r = self.rw.flush();
136    self.rw_result(false, r.map(|()|0) ).map(|_|())
137  }
138}
139
140#[cfg(test)]
141#[cfg(not(miri))]
142use crate::capture_warns_warn as warn;
143
144#[cfg(test)]
145#[cfg(not(miri))]
146pub mod test {
147
148  use crate::prelude::*;
149  use super::*;
150
151  pub mod capture_warns {
152    use crate::prelude::*;
153    use std::cell::RefCell;
154
155    thread_local! {
156      pub static WARNINGS: RefCell<Option<Vec<String>>> = RefCell::new(None)
157    }
158
159    #[macro_export]
160    macro_rules! capture_warns_warn {
161      { $fmt:literal $($rhs:tt)* } => {
162        $crate::childio::test::capture_warns::WARNINGS.with(|w| {
163          let mut w = w.borrow_mut();
164          if let Some(ref mut msgs) = *w {
165            let s = format!($fmt $($rhs)*);
166            msgs.push(s);
167          } else {
168            crate::prelude::warn!($fmt $($rhs)*);
169          }
170        });
171      }
172    }
173
174    pub fn run(f: &dyn Fn()) -> Vec<String> {
175      WARNINGS.with(|w| {
176        let mut w =w.borrow_mut();
177        *w = Some(vec![]);
178      });
179      f();
180      WARNINGS.with(|w| {
181        let mut w =w.borrow_mut();
182        mem::take(&mut *w).unwrap()
183      })
184    }
185  }
186
187  #[test]
188  fn t_cat() {
189    let setup = ||{
190      let c = Command::new("cat");
191      run_pair(c, "cat".into()).unwrap()
192    };
193
194    {
195      let (mut w, mut r) = setup();
196      assert_eq!( write!(w, "hi").unwrap(), () );
197      assert_eq!( w.flush()      .unwrap(), () );
198      let mut buf = [0;10];
199      assert_eq!( r.read(&mut buf).unwrap(), 2 );
200      assert_eq!(&buf[0..2], b"hi");
201    }
202
203    let w = capture_warns::run(&||{
204      let (_w, _r) = setup();
205    });
206    assert_eq!( &w, &[] as &[String] );
207  }
208
209  static STATUS_1: &str = "exit status: 1";
210
211  fn assert_is_status_1(e: &io::Error) {
212    assert_eq!( e.kind(), ErrorKind::Other );
213    let es = e.to_string();
214    assert!( es.ends_with(STATUS_1), "actually {:?}", es );
215  }
216
217  #[test]
218  fn t_false() {
219    let setup = ||{
220      let c = Command::new("false");
221      run_pair(c, "false".into()).unwrap()
222    };
223
224    let one = | f: &dyn Fn(&mut Stdin, &mut Stdout) -> io::Result<()> |{
225      let (mut w, mut r) = setup();
226
227      let r = f(&mut w, &mut r);
228      let e = r.unwrap_err();
229      assert_is_status_1(&e);
230    };
231
232    one(&|_w, r|{
233      let mut buf = [0;10];
234      dbgc!( r.read(&mut buf).map(|_|()) )
235    });
236
237    let lose_race = |w: &mut Stdin| {
238      w.child.lock().child.wait().unwrap();
239
240      // On Linux you can sometimes eperience this:
241      //   fork a child, giving it the only reading end of a pipe
242      //   wait for the child to complete
243      //   get the child's exit status with waitpid
244      //   write to the pipe anc get success, even though no-one has a
245      //   reading end any more
246      // I think this is a bug but I bet the kernel people will argue
247      // the contrary and I don't even think it worth reporting.
248      // So here is a countermeasure:
249      for ms in 0.. {
250        let fd = w.rw.as_raw_fd();
251        match nix::unistd::write(fd, &[0]) {
252          Err(nix::errno::Errno::EPIPE) => break,
253          Ok(x) if x <= 1 => {
254            eprintln!("wait/pipe Linux bug ({}), taking countermeasures!", x);
255          },
256          x => panic!("{:?}", x),
257        }
258        std::thread::sleep(Duration::from_millis(ms));
259      }
260    };
261
262    one(&|w, _r|{
263      // make sure we will get EPIPE
264      lose_race(w);
265      dbgc!( write!(w, "hi") )
266    });
267
268    let w = capture_warns::run(&||{
269      let (mut w, _r) = setup();
270      lose_race(&mut w);
271    });
272    dbgc!(&w);
273    assert_eq!( w.len(), 1 );
274    assert!( w[0].ends_with(STATUS_1) );
275  }
276
277
278  #[test]
279  fn t_like_linux_wtf() {
280    let mut c = Command::new("sh");
281    c.args(&["-ec", "exec >/dev/null; sleep 0.1; exit 1"]);
282    let (_w, mut r) = run_pair(c, "sh".to_owned()).unwrap();
283    let mut buf = [0;10];
284    let e = dbgc!( r.read(&mut buf) ).unwrap_err();
285    assert_is_status_1(&e);
286  }
287}