os_pipe/lib.rs
1//! A cross-platform library for opening OS pipes, like those from
2//! [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) on Linux
3//! or
4//! [`CreatePipe`](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe)
5//! on Windows. The Rust standard library provides
6//! [`Stdio::piped`](https://doc.rust-lang.org/std/process/struct.Stdio.html#method.piped)
7//! for simple use cases involving child processes, but it doesn't
8//! support creating pipes directly. This crate fills that gap.
9//!
10//! - [Docs](https://docs.rs/os_pipe)
11//! - [Crate](https://crates.io/crates/os_pipe)
12//! - [Repo](https://github.com/oconnor663/os_pipe.rs)
13//!
14//! # Common deadlocks related to pipes
15//!
16//! When you work with pipes, you often end up debugging a deadlock at
17//! some point. These can be confusing if you don't know why they
18//! happen. Here are two things you need to know:
19//!
20//! 1. Pipe reads will block waiting for input as long as there's at
21//! least one writer still open. **If you forget to close a writer,
22//! reads will block forever.** This includes writers that you give
23//! to child processes.
24//! 2. Pipes have an internal buffer of some fixed size. On Linux for
25//! example, pipe buffers are 64 KiB by default. When the buffer is
26//! full, writes will block waiting for space. **If the buffer is
27//! full and there aren't any readers, writes will block forever.**
28//!
29//! Deadlocks caused by a forgotten writer usually show up immediately,
30//! which makes them relatively easy to fix once you know what to look
31//! for. (See "Avoid a deadlock!" in the example code below.) However,
32//! deadlocks caused by full pipe buffers are trickier. These might only
33//! show up for larger inputs, and they might be timing-dependent or
34//! platform-dependent. If you find that writing to a pipe deadlocks
35//! sometimes, think about who's supposed to be reading from that pipe,
36//! and whether that thread or process might be blocked on something
37//! else. For more on this, see the [Gotchas
38//! Doc](https://github.com/oconnor663/duct.py/blob/master/gotchas.md#using-io-threads-to-avoid-blocking-children)
39//! from the [`duct`](https://github.com/oconnor663/duct.rs) crate. (And
40//! consider whether [`duct`](https://github.com/oconnor663/duct.rs)
41//! might be a good fit for your use case.)
42//!
43//! # Examples
44//!
45//! Here we write a single byte into a pipe and read it back out:
46//!
47//! ```rust
48//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
49//! use std::io::prelude::*;
50//!
51//! let (mut reader, mut writer) = os_pipe::pipe()?;
52//! // XXX: If this write blocks, we'll never get to the read.
53//! writer.write_all(b"x")?;
54//! let mut output = [0];
55//! reader.read_exact(&mut output)?;
56//! assert_eq!(b"x", &output);
57//! # Ok(())
58//! # }
59//! ```
60//!
61//! This is a minimal working example, but as discussed in the section
62//! above, reading and writing on the same thread like this is
63//! deadlock-prone. If we wrote 100 KB instead of just one byte, this
64//! example would block on `write_all`, it would never make it to
65//! `read_exact`, and that would be a deadlock. Doing the read and write
66//! from different threads or different processes would fix the
67//! deadlock.
68//!
69//! For a more complex example, here we join the stdout and stderr of a
70//! child process into a single pipe. To do that we open a pipe, clone
71//! its writer, and set that pair of writers as the child's stdout and
72//! stderr. (This is possible because `PipeWriter` implements
73//! `Into<Stdio>`.) Then we can read interleaved output from the pipe
74//! reader. This example is deadlock-free, but note the comment about
75//! closing the writers.
76//!
77//! ```rust
78//! # use std::io::prelude::*;
79//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
80//! // We're going to spawn a child process that prints "foo" to stdout
81//! // and "bar" to stderr, and we'll combine these into a single pipe.
82//! let mut command = std::process::Command::new("python");
83//! command.args(&["-c", r#"
84//! import sys
85//! sys.stdout.write("foo")
86//! sys.stdout.flush()
87//! sys.stderr.write("bar")
88//! sys.stderr.flush()
89//! "#]);
90//!
91//! // Here's the interesting part. Open a pipe, clone its writer, and
92//! // set that pair of writers as the child's stdout and stderr.
93//! let (mut reader, writer) = os_pipe::pipe()?;
94//! let writer_clone = writer.try_clone()?;
95//! command.stdout(writer);
96//! command.stderr(writer_clone);
97//!
98//! // Now start the child process running.
99//! let mut handle = command.spawn()?;
100//!
101//! // Avoid a deadlock! This parent process is still holding open pipe
102//! // writers inside the Command object, and we have to close those
103//! // before we read. Here we do this by dropping the Command object.
104//! drop(command);
105//!
106//! // Finally we can read all the output and clean up the child.
107//! let mut output = String::new();
108//! reader.read_to_string(&mut output)?;
109//! handle.wait()?;
110//! assert_eq!(output, "foobar");
111//! # Ok(())
112//! # }
113//! ```
114//!
115//! Note that the [`duct`](https://github.com/oconnor663/duct.rs) crate
116//! can reproduce the example above in a single line of code, with no
117//! risk of deadlocks and no risk of leaking [zombie
118//! children](https://en.wikipedia.org/wiki/Zombie_process).
119
120use std::fs::File;
121use std::io;
122use std::process::Stdio;
123
124#[cfg(not(windows))]
125#[path = "unix.rs"]
126mod sys;
127#[cfg(windows)]
128#[path = "windows.rs"]
129mod sys;
130
131/// The reading end of a pipe, returned by [`pipe`](fn.pipe.html).
132///
133/// `PipeReader` implements `Into<Stdio>`, so you can pass it as an argument to
134/// `Command::stdin` to spawn a child process that reads from the pipe.
135#[derive(Debug)]
136pub struct PipeReader(
137 // We use std::fs::File here for two reasons: OwnedFd and OwnedHandle are platform-specific,
138 // and this gives us read/write/flush for free.
139 File,
140);
141
142impl PipeReader {
143 pub fn try_clone(&self) -> io::Result<PipeReader> {
144 self.0.try_clone().map(PipeReader)
145 }
146}
147
148impl io::Read for PipeReader {
149 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
150 self.0.read(buf)
151 }
152}
153
154impl<'a> io::Read for &'a PipeReader {
155 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156 (&self.0).read(buf)
157 }
158}
159
160impl From<PipeReader> for Stdio {
161 fn from(p: PipeReader) -> Stdio {
162 p.0.into()
163 }
164}
165
166/// The writing end of a pipe, returned by [`pipe`](fn.pipe.html).
167///
168/// `PipeWriter` implements `Into<Stdio>`, so you can pass it as an argument to
169/// `Command::stdout` or `Command::stderr` to spawn a child process that writes
170/// to the pipe.
171#[derive(Debug)]
172pub struct PipeWriter(File);
173
174impl PipeWriter {
175 pub fn try_clone(&self) -> io::Result<PipeWriter> {
176 self.0.try_clone().map(PipeWriter)
177 }
178}
179
180impl io::Write for PipeWriter {
181 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
182 self.0.write(buf)
183 }
184
185 fn flush(&mut self) -> io::Result<()> {
186 self.0.flush()
187 }
188}
189
190impl<'a> io::Write for &'a PipeWriter {
191 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
192 (&self.0).write(buf)
193 }
194
195 fn flush(&mut self) -> io::Result<()> {
196 (&self.0).flush()
197 }
198}
199
200impl From<PipeWriter> for Stdio {
201 fn from(p: PipeWriter) -> Stdio {
202 p.0.into()
203 }
204}
205
206/// Open a new pipe and return a [`PipeReader`] and [`PipeWriter`] pair.
207///
208/// This corresponds to the `pipe2` library call on Posix and the
209/// `CreatePipe` library call on Windows (though these implementation
210/// details might change). These pipes are non-inheritable, so new child
211/// processes won't receive a copy of them unless they're explicitly
212/// passed as stdin/stdout/stderr.
213///
214/// [`PipeReader`]: struct.PipeReader.html
215/// [`PipeWriter`]: struct.PipeWriter.html
216pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> {
217 sys::pipe()
218}
219
220/// Get a duplicated copy of the current process's standard input, as a
221/// [`PipeReader`].
222///
223/// Reading directly from this pipe isn't recommended, because it's not
224/// synchronized with [`std::io::stdin`]. [`PipeReader`] implements
225/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdin`]. This is
226/// equivalent to [`Stdio::inherit`], though, so it's usually not necessary
227/// unless you need a collection of different pipes.
228///
229/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
230/// [`PipeReader`]: struct.PipeReader.html
231/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
232/// [`Command::stdin`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdin
233/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
234pub fn dup_stdin() -> io::Result<PipeReader> {
235 sys::dup(io::stdin()).map(PipeReader::from)
236}
237
238/// Get a duplicated copy of the current process's standard output, as a
239/// [`PipeWriter`](struct.PipeWriter.html).
240///
241/// Writing directly to this pipe isn't recommended, because it's not
242/// synchronized with [`std::io::stdout`]. [`PipeWriter`] implements
243/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
244/// [`Command::stderr`]. This can be useful if you want the child's stderr to go
245/// to the parent's stdout.
246///
247/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
248/// [`PipeWriter`]: struct.PipeWriter.html
249/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
250/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
251/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
252/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
253pub fn dup_stdout() -> io::Result<PipeWriter> {
254 sys::dup(io::stdout()).map(PipeWriter::from)
255}
256
257/// Get a duplicated copy of the current process's standard error, as a
258/// [`PipeWriter`](struct.PipeWriter.html).
259///
260/// Writing directly to this pipe isn't recommended, because it's not
261/// synchronized with [`std::io::stderr`]. [`PipeWriter`] implements
262/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
263/// [`Command::stderr`]. This can be useful if you want the child's stdout to go
264/// to the parent's stderr.
265///
266/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
267/// [`PipeWriter`]: struct.PipeWriter.html
268/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
269/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
270/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
271/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
272pub fn dup_stderr() -> io::Result<PipeWriter> {
273 sys::dup(io::stderr()).map(PipeWriter::from)
274}
275
276#[cfg(test)]
277mod tests {
278 use std::env::consts::EXE_EXTENSION;
279 use std::io::prelude::*;
280 use std::path::{Path, PathBuf};
281 use std::process::Command;
282 use std::sync::Once;
283 use std::thread;
284
285 fn path_to_exe(name: &str) -> PathBuf {
286 // This project defines some associated binaries for testing, and we shell out to them in
287 // these tests. `cargo test` doesn't automatically build associated binaries, so this
288 // function takes care of building them explicitly, with the right debug/release flavor.
289 static CARGO_BUILD_ONCE: Once = Once::new();
290 CARGO_BUILD_ONCE.call_once(|| {
291 let mut build_command = Command::new("cargo");
292 build_command.args(&["build", "--quiet"]);
293 if !cfg!(debug_assertions) {
294 build_command.arg("--release");
295 }
296 let build_status = build_command.status().unwrap();
297 assert!(
298 build_status.success(),
299 "Cargo failed to build associated binaries."
300 );
301 });
302 let flavor = if cfg!(debug_assertions) {
303 "debug"
304 } else {
305 "release"
306 };
307 Path::new("target")
308 .join(flavor)
309 .join(name)
310 .with_extension(EXE_EXTENSION)
311 }
312
313 #[test]
314 fn test_pipe_some_data() {
315 let (mut reader, mut writer) = crate::pipe().unwrap();
316 // A small write won't fill the pipe buffer, so it won't block this thread.
317 writer.write_all(b"some stuff").unwrap();
318 drop(writer);
319 let mut out = String::new();
320 reader.read_to_string(&mut out).unwrap();
321 assert_eq!(out, "some stuff");
322 }
323
324 #[test]
325 fn test_pipe_some_data_with_refs() {
326 // As with `File`, there's a second set of impls for shared
327 // refs. Test those.
328 let (reader, writer) = crate::pipe().unwrap();
329 let mut reader_ref = &reader;
330 {
331 let mut writer_ref = &writer;
332 // A small write won't fill the pipe buffer, so it won't block this thread.
333 writer_ref.write_all(b"some stuff").unwrap();
334 }
335 drop(writer);
336 let mut out = String::new();
337 reader_ref.read_to_string(&mut out).unwrap();
338 assert_eq!(out, "some stuff");
339 }
340
341 #[test]
342 fn test_pipe_no_data() {
343 let (mut reader, writer) = crate::pipe().unwrap();
344 drop(writer);
345 let mut out = String::new();
346 reader.read_to_string(&mut out).unwrap();
347 assert_eq!(out, "");
348 }
349
350 #[test]
351 fn test_pipe_a_megabyte_of_data_from_another_thread() {
352 let data = vec![0xff; 1_000_000];
353 let data_copy = data.clone();
354 let (mut reader, mut writer) = crate::pipe().unwrap();
355 let joiner = thread::spawn(move || {
356 writer.write_all(&data_copy).unwrap();
357 // This drop happens automatically, so writing it out here is mostly
358 // just for clarity. For what it's worth, it also guards against
359 // accidentally forgetting to drop if we switch to scoped threads or
360 // something like that and change this to a non-moving closure. The
361 // explicit drop forces `writer` to move.
362 drop(writer);
363 });
364 let mut out = Vec::new();
365 reader.read_to_end(&mut out).unwrap();
366 joiner.join().unwrap();
367 assert_eq!(out, data);
368 }
369
370 #[test]
371 fn test_pipes_are_not_inheritable() {
372 // Create pipes for a child process.
373 let (input_reader, mut input_writer) = crate::pipe().unwrap();
374 let (mut output_reader, output_writer) = crate::pipe().unwrap();
375
376 // Create a bunch of duplicated copies, which we'll close later. This
377 // tests that duplication preserves non-inheritability.
378 let ir_dup = input_reader.try_clone().unwrap();
379 let iw_dup = input_writer.try_clone().unwrap();
380 let or_dup = output_reader.try_clone().unwrap();
381 let ow_dup = output_writer.try_clone().unwrap();
382
383 // Spawn the child. Note that this temporary Command object takes
384 // ownership of our copies of the child's stdin and stdout, and then
385 // closes them immediately when it drops. That stops us from blocking
386 // our own read below. We use our own simple implementation of cat for
387 // compatibility with Windows.
388 let mut child = Command::new(path_to_exe("cat"))
389 .stdin(input_reader)
390 .stdout(output_writer)
391 .spawn()
392 .unwrap();
393
394 // Drop all the dups now that the child is spawned.
395 drop(ir_dup);
396 drop(iw_dup);
397 drop(or_dup);
398 drop(ow_dup);
399
400 // Write to the child's stdin. This is a small write, so it shouldn't
401 // block.
402 input_writer.write_all(b"hello").unwrap();
403 drop(input_writer);
404
405 // Read from the child's stdout. If this child has accidentally
406 // inherited the write end of its own stdin, then it will never exit,
407 // and this read will block forever. That's what this test is all
408 // about.
409 let mut output = Vec::new();
410 output_reader.read_to_end(&mut output).unwrap();
411 child.wait().unwrap();
412
413 // Confirm that we got the right bytes.
414 assert_eq!(b"hello", &*output);
415 }
416
417 #[test]
418 fn test_parent_handles() {
419 // This test invokes the `swap` test program, which uses parent_stdout() and
420 // parent_stderr() to swap the outputs for another child that it spawns.
421
422 // Create pipes for a child process.
423 let (reader, mut writer) = crate::pipe().unwrap();
424
425 // Write input. This shouldn't block because it's small. Then close the write end, or else
426 // the child will hang.
427 writer.write_all(b"quack").unwrap();
428 drop(writer);
429
430 // Use `swap` to run `cat_both`. `cat_both will read "quack" from stdin
431 // and write it to stdout and stderr with different tags. But because we
432 // run it inside `swap`, the tags in the output should be backwards.
433 let output = Command::new(path_to_exe("swap"))
434 .arg(path_to_exe("cat_both"))
435 .stdin(reader)
436 .output()
437 .unwrap();
438
439 // Check for a clean exit.
440 assert!(
441 output.status.success(),
442 "child process returned {:#?}",
443 output
444 );
445
446 // Confirm that we got the right bytes.
447 assert_eq!(b"stderr: quack", &*output.stdout);
448 assert_eq!(b"stdout: quack", &*output.stderr);
449 }
450
451 #[test]
452 fn test_parent_handles_dont_close() {
453 // Open and close each parent pipe multiple times. If this closes the
454 // original, subsequent opens should fail.
455 let stdin = crate::dup_stdin().unwrap();
456 drop(stdin);
457 let stdin = crate::dup_stdin().unwrap();
458 drop(stdin);
459
460 let stdout = crate::dup_stdout().unwrap();
461 drop(stdout);
462 let stdout = crate::dup_stdout().unwrap();
463 drop(stdout);
464
465 let stderr = crate::dup_stderr().unwrap();
466 drop(stderr);
467 let stderr = crate::dup_stderr().unwrap();
468 drop(stderr);
469 }
470
471 #[test]
472 fn test_try_clone() {
473 let (reader, writer) = crate::pipe().unwrap();
474 let mut reader_clone = reader.try_clone().unwrap();
475 let mut writer_clone = writer.try_clone().unwrap();
476 // A small write won't fill the pipe buffer, so it won't block this thread.
477 writer_clone.write_all(b"some stuff").unwrap();
478 drop(writer);
479 drop(writer_clone);
480 let mut out = String::new();
481 reader_clone.read_to_string(&mut out).unwrap();
482 assert_eq!(out, "some stuff");
483 }
484
485 #[test]
486 fn test_debug() {
487 let (reader, writer) = crate::pipe().unwrap();
488 _ = format!("{:?} {:?}", reader, writer);
489 }
490}