mio_pipe/
lib.rs

1//! Unix pipe for use with Mio.
2//!
3//! See the [`new_pipe`] documentation.
4//!
5//! ## Supported platforms
6//!
7//! Currently supported platforms:
8//!
9//! * Android
10//! * DragonFly BSD
11//! * FreeBSD
12//! * Linux
13//! * NetBSD
14//! * OpenBSD
15//! * iOS
16//! * macOS
17//!
18//! The most notable exception in the list is Windows. If you want to contribute
19//! a port to Windows please see [issue #4].
20//!
21//! [issue #4]: https://github.com/Thomasdezeeuw/mio-pipe/issues/6
22
23use std::io::{self, IoSlice, IoSliceMut, Read, Write};
24#[cfg(unix)]
25use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
26#[cfg(unix)]
27use std::process::{ChildStderr, ChildStdin, ChildStdout};
28
29use mio::{event, Interest, Registry, Token};
30
31mod sys;
32
33/// Sending end of an Unix pipe.
34///
35/// See [`new_pipe`] for documentation, including examples.
36#[derive(Debug)]
37pub struct Sender {
38    inner: sys::Sender,
39}
40
41impl Sender {
42    /// Set the `Sender` into or out of non-blocking mode.
43    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
44        self.inner.set_nonblocking(nonblocking)
45    }
46}
47
48impl event::Source for Sender {
49    fn register(
50        &mut self,
51        registry: &Registry,
52        token: Token,
53        interests: Interest,
54    ) -> io::Result<()> {
55        self.inner.register(registry, token, interests)
56    }
57
58    fn reregister(
59        &mut self,
60        registry: &Registry,
61        token: Token,
62        interests: Interest,
63    ) -> io::Result<()> {
64        self.inner.reregister(registry, token, interests)
65    }
66
67    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
68        self.inner.deregister(registry)
69    }
70}
71
72impl Write for Sender {
73    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
74        self.inner.write(buf)
75    }
76
77    fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
78        self.inner.write_vectored(bufs)
79    }
80
81    fn flush(&mut self) -> io::Result<()> {
82        self.inner.flush()
83    }
84}
85
86/// # Notes
87///
88/// The underlying pipe is **not** set to non-blocking.
89#[cfg(unix)]
90impl From<ChildStdin> for Sender {
91    fn from(stdin: ChildStdin) -> Sender {
92        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
93        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
94    }
95}
96
97#[cfg(unix)]
98impl FromRawFd for Sender {
99    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
100        Sender {
101            inner: sys::Sender::from_raw_fd(fd),
102        }
103    }
104}
105
106#[cfg(unix)]
107impl AsRawFd for Sender {
108    fn as_raw_fd(&self) -> RawFd {
109        self.inner.as_raw_fd()
110    }
111}
112
113#[cfg(unix)]
114impl IntoRawFd for Sender {
115    fn into_raw_fd(self) -> RawFd {
116        self.inner.into_raw_fd()
117    }
118}
119
120/// Receiving end of an Unix pipe.
121///
122/// See [`new_pipe`] for documentation, including examples.
123#[derive(Debug)]
124pub struct Receiver {
125    inner: sys::Receiver,
126}
127
128impl Receiver {
129    /// Set the `Receiver` into or out of non-blocking mode.
130    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
131        self.inner.set_nonblocking(nonblocking)
132    }
133}
134
135impl event::Source for Receiver {
136    fn register(
137        &mut self,
138        registry: &Registry,
139        token: Token,
140        interests: Interest,
141    ) -> io::Result<()> {
142        self.inner.register(registry, token, interests)
143    }
144
145    fn reregister(
146        &mut self,
147        registry: &Registry,
148        token: Token,
149        interests: Interest,
150    ) -> io::Result<()> {
151        self.inner.reregister(registry, token, interests)
152    }
153
154    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
155        self.inner.deregister(registry)
156    }
157}
158
159impl Read for Receiver {
160    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
161        self.inner.read(buf)
162    }
163
164    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
165        self.inner.read_vectored(bufs)
166    }
167}
168
169/// # Notes
170///
171/// The underlying pipe is **not** set to non-blocking.
172#[cfg(unix)]
173impl From<ChildStdout> for Receiver {
174    fn from(stdout: ChildStdout) -> Receiver {
175        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
176        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
177    }
178}
179
180/// # Notes
181///
182/// The underlying pipe is **not** set to non-blocking.
183#[cfg(unix)]
184impl From<ChildStderr> for Receiver {
185    fn from(stderr: ChildStderr) -> Receiver {
186        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
187        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
188    }
189}
190
191#[cfg(unix)]
192impl FromRawFd for Receiver {
193    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
194        Receiver {
195            inner: sys::Receiver::from_raw_fd(fd),
196        }
197    }
198}
199
200#[cfg(unix)]
201impl AsRawFd for Receiver {
202    fn as_raw_fd(&self) -> RawFd {
203        self.inner.as_raw_fd()
204    }
205}
206
207#[cfg(unix)]
208impl IntoRawFd for Receiver {
209    fn into_raw_fd(self) -> RawFd {
210        self.inner.into_raw_fd()
211    }
212}
213
214/// Create a new non-blocking Unix pipe.
215///
216/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
217/// inter-process or thread communication channel.
218///
219/// This channel may be created before forking the process and then one end used
220/// in each process, e.g. the parent process has the sending end to send command
221/// to the child process.
222///
223/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
224///
225/// # Events
226///
227/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
228/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
229/// written to the `Sender` the `Receiver` will receive an [readable event].
230///
231/// In addition to those events, events will also be generated if the other side
232/// is dropped. However due to platform differences checking `is_{read,
233/// write}_closed` is not enough. To check if the `Sender` is dropped you'll
234/// need to check both [`is_error`] and [`is_read_closed`] on events for the
235/// `Receiver`, if either is true the `Sender` is dropped. On the `Sender` end
236/// check `is_error` and [`is_write_closed`], if either is true the `Receiver`
237/// was dropped. Also see the second example below.
238///
239/// [`WRITABLE`]: Interest::WRITABLE
240/// [writable events]: mio::event::Event::is_writable
241/// [`READABLE`]: Interest::READABLE
242/// [readable event]: mio::event::Event::is_readable
243/// [`is_error`]: mio::event::Event::is_error
244/// [`is_read_closed`]: mio::event::Event::is_read_closed
245/// [`is_write_closed`]: mio::event::Event::is_write_closed
246///
247/// # Deregistering
248///
249/// Both `Sender` and `Receiver` will deregister themselves when dropped,
250/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
251///
252/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
253///
254/// # Examples
255///
256/// Simple example that writes data into the sending end and read it from the
257/// receiving end.
258///
259/// ```
260/// use std::io::{self, Read, Write};
261///
262/// use mio::{Poll, Events, Interest, Token};
263/// use mio_pipe::new_pipe;
264///
265/// // Unique tokens for the two ends of the channel.
266/// const PIPE_RECV: Token = Token(0);
267/// const PIPE_SEND: Token = Token(1);
268///
269/// # fn main() -> io::Result<()> {
270/// // Create our `Poll` instance and the `Events` container.
271/// let mut poll = Poll::new()?;
272/// let mut events = Events::with_capacity(8);
273///
274/// // Create a new pipe.
275/// let (mut sender, mut receiver) = new_pipe()?;
276///
277/// // Register both ends of the channel.
278/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
279/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
280///
281/// const MSG: &[u8; 11] = b"Hello world";
282///
283/// loop {
284///     poll.poll(&mut events, None)?;
285///
286///     for event in events.iter() {
287///         match event.token() {
288///             PIPE_SEND => sender.write(MSG)
289///                 .and_then(|n| if n != MSG.len() {
290///                         // We'll consider a short write an error in this
291///                         // example. NOTE: we can't use `write_all` with
292///                         // non-blocking I/O.
293///                         Err(io::ErrorKind::WriteZero.into())
294///                     } else {
295///                         Ok(())
296///                     })?,
297///             PIPE_RECV => {
298///                 let mut buf = [0; 11];
299///                 let n = receiver.read(&mut buf)?;
300///                 println!("received: {:?}", &buf[0..n]);
301///                 assert_eq!(n, MSG.len());
302///                 assert_eq!(&buf, &*MSG);
303///                 return Ok(());
304///             },
305///             _ => unreachable!(),
306///         }
307///     }
308/// }
309/// # }
310/// ```
311///
312/// Example that receives an event once the `Sender` is dropped.
313///
314/// ```
315/// # use std::io::{self, Read, Write};
316/// #
317/// # use mio::{Poll, Events, Interest, Token};
318/// # use mio_pipe::new_pipe;
319/// #
320/// # const PIPE_RECV: Token = Token(0);
321/// # const PIPE_SEND: Token = Token(1);
322/// #
323/// # fn main() -> io::Result<()> {
324/// // Same setup as in the example above.
325/// let mut poll = Poll::new()?;
326/// let mut events = Events::with_capacity(8);
327///
328/// let (mut sender, mut receiver) = new_pipe()?;
329///
330/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
331/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
332///
333/// // Drop the sender.
334/// drop(sender);
335///
336/// poll.poll(&mut events, None)?;
337///
338/// for event in events.iter() {
339///     match event.token() {
340///         PIPE_RECV if event.is_error() || event.is_read_closed() => {
341///             // Detected that the sender was dropped.
342///             println!("Sender dropped!");
343///             return Ok(());
344///         },
345///         _ => unreachable!(),
346///     }
347/// }
348/// # unreachable!();
349/// # }
350/// ```
351pub fn new_pipe() -> io::Result<(Sender, Receiver)> {
352    sys::new_pipe()
353        .map(|(sender, receiver)| (Sender { inner: sender }, Receiver { inner: receiver }))
354}