mio_pipe/lib.rs
1//! Unix pipe for use with Mio.
2//!
3//! See the [`new_pipe`] documentation.
4//!
5//! ## Supported platforms
6//!
7//! Currently supported platforms:
8//!
9//! * Android
10//! * DragonFly BSD
11//! * FreeBSD
12//! * Linux
13//! * NetBSD
14//! * OpenBSD
15//! * iOS
16//! * macOS
17//!
18//! The most notable exception in the list is Windows. If you want to contribute
19//! a port to Windows please see [issue #4].
20//!
21//! [issue #4]: https://github.com/Thomasdezeeuw/mio-pipe/issues/6
22
23use std::io::{self, IoSlice, IoSliceMut, Read, Write};
24#[cfg(unix)]
25use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
26#[cfg(unix)]
27use std::process::{ChildStderr, ChildStdin, ChildStdout};
28
29use mio::{event, Interest, Registry, Token};
30
31mod sys;
32
33/// Sending end of an Unix pipe.
34///
35/// See [`new_pipe`] for documentation, including examples.
36#[derive(Debug)]
37pub struct Sender {
38 inner: sys::Sender,
39}
40
41impl Sender {
42 /// Set the `Sender` into or out of non-blocking mode.
43 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
44 self.inner.set_nonblocking(nonblocking)
45 }
46}
47
48impl event::Source for Sender {
49 fn register(
50 &mut self,
51 registry: &Registry,
52 token: Token,
53 interests: Interest,
54 ) -> io::Result<()> {
55 self.inner.register(registry, token, interests)
56 }
57
58 fn reregister(
59 &mut self,
60 registry: &Registry,
61 token: Token,
62 interests: Interest,
63 ) -> io::Result<()> {
64 self.inner.reregister(registry, token, interests)
65 }
66
67 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
68 self.inner.deregister(registry)
69 }
70}
71
72impl Write for Sender {
73 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
74 self.inner.write(buf)
75 }
76
77 fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
78 self.inner.write_vectored(bufs)
79 }
80
81 fn flush(&mut self) -> io::Result<()> {
82 self.inner.flush()
83 }
84}
85
86/// # Notes
87///
88/// The underlying pipe is **not** set to non-blocking.
89#[cfg(unix)]
90impl From<ChildStdin> for Sender {
91 fn from(stdin: ChildStdin) -> Sender {
92 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
93 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
94 }
95}
96
97#[cfg(unix)]
98impl FromRawFd for Sender {
99 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
100 Sender {
101 inner: sys::Sender::from_raw_fd(fd),
102 }
103 }
104}
105
106#[cfg(unix)]
107impl AsRawFd for Sender {
108 fn as_raw_fd(&self) -> RawFd {
109 self.inner.as_raw_fd()
110 }
111}
112
113#[cfg(unix)]
114impl IntoRawFd for Sender {
115 fn into_raw_fd(self) -> RawFd {
116 self.inner.into_raw_fd()
117 }
118}
119
120/// Receiving end of an Unix pipe.
121///
122/// See [`new_pipe`] for documentation, including examples.
123#[derive(Debug)]
124pub struct Receiver {
125 inner: sys::Receiver,
126}
127
128impl Receiver {
129 /// Set the `Receiver` into or out of non-blocking mode.
130 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
131 self.inner.set_nonblocking(nonblocking)
132 }
133}
134
135impl event::Source for Receiver {
136 fn register(
137 &mut self,
138 registry: &Registry,
139 token: Token,
140 interests: Interest,
141 ) -> io::Result<()> {
142 self.inner.register(registry, token, interests)
143 }
144
145 fn reregister(
146 &mut self,
147 registry: &Registry,
148 token: Token,
149 interests: Interest,
150 ) -> io::Result<()> {
151 self.inner.reregister(registry, token, interests)
152 }
153
154 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
155 self.inner.deregister(registry)
156 }
157}
158
159impl Read for Receiver {
160 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
161 self.inner.read(buf)
162 }
163
164 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
165 self.inner.read_vectored(bufs)
166 }
167}
168
169/// # Notes
170///
171/// The underlying pipe is **not** set to non-blocking.
172#[cfg(unix)]
173impl From<ChildStdout> for Receiver {
174 fn from(stdout: ChildStdout) -> Receiver {
175 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
176 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
177 }
178}
179
180/// # Notes
181///
182/// The underlying pipe is **not** set to non-blocking.
183#[cfg(unix)]
184impl From<ChildStderr> for Receiver {
185 fn from(stderr: ChildStderr) -> Receiver {
186 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
187 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
188 }
189}
190
191#[cfg(unix)]
192impl FromRawFd for Receiver {
193 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
194 Receiver {
195 inner: sys::Receiver::from_raw_fd(fd),
196 }
197 }
198}
199
200#[cfg(unix)]
201impl AsRawFd for Receiver {
202 fn as_raw_fd(&self) -> RawFd {
203 self.inner.as_raw_fd()
204 }
205}
206
207#[cfg(unix)]
208impl IntoRawFd for Receiver {
209 fn into_raw_fd(self) -> RawFd {
210 self.inner.into_raw_fd()
211 }
212}
213
214/// Create a new non-blocking Unix pipe.
215///
216/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
217/// inter-process or thread communication channel.
218///
219/// This channel may be created before forking the process and then one end used
220/// in each process, e.g. the parent process has the sending end to send command
221/// to the child process.
222///
223/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
224///
225/// # Events
226///
227/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
228/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
229/// written to the `Sender` the `Receiver` will receive an [readable event].
230///
231/// In addition to those events, events will also be generated if the other side
232/// is dropped. However due to platform differences checking `is_{read,
233/// write}_closed` is not enough. To check if the `Sender` is dropped you'll
234/// need to check both [`is_error`] and [`is_read_closed`] on events for the
235/// `Receiver`, if either is true the `Sender` is dropped. On the `Sender` end
236/// check `is_error` and [`is_write_closed`], if either is true the `Receiver`
237/// was dropped. Also see the second example below.
238///
239/// [`WRITABLE`]: Interest::WRITABLE
240/// [writable events]: mio::event::Event::is_writable
241/// [`READABLE`]: Interest::READABLE
242/// [readable event]: mio::event::Event::is_readable
243/// [`is_error`]: mio::event::Event::is_error
244/// [`is_read_closed`]: mio::event::Event::is_read_closed
245/// [`is_write_closed`]: mio::event::Event::is_write_closed
246///
247/// # Deregistering
248///
249/// Both `Sender` and `Receiver` will deregister themselves when dropped,
250/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
251///
252/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
253///
254/// # Examples
255///
256/// Simple example that writes data into the sending end and read it from the
257/// receiving end.
258///
259/// ```
260/// use std::io::{self, Read, Write};
261///
262/// use mio::{Poll, Events, Interest, Token};
263/// use mio_pipe::new_pipe;
264///
265/// // Unique tokens for the two ends of the channel.
266/// const PIPE_RECV: Token = Token(0);
267/// const PIPE_SEND: Token = Token(1);
268///
269/// # fn main() -> io::Result<()> {
270/// // Create our `Poll` instance and the `Events` container.
271/// let mut poll = Poll::new()?;
272/// let mut events = Events::with_capacity(8);
273///
274/// // Create a new pipe.
275/// let (mut sender, mut receiver) = new_pipe()?;
276///
277/// // Register both ends of the channel.
278/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
279/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
280///
281/// const MSG: &[u8; 11] = b"Hello world";
282///
283/// loop {
284/// poll.poll(&mut events, None)?;
285///
286/// for event in events.iter() {
287/// match event.token() {
288/// PIPE_SEND => sender.write(MSG)
289/// .and_then(|n| if n != MSG.len() {
290/// // We'll consider a short write an error in this
291/// // example. NOTE: we can't use `write_all` with
292/// // non-blocking I/O.
293/// Err(io::ErrorKind::WriteZero.into())
294/// } else {
295/// Ok(())
296/// })?,
297/// PIPE_RECV => {
298/// let mut buf = [0; 11];
299/// let n = receiver.read(&mut buf)?;
300/// println!("received: {:?}", &buf[0..n]);
301/// assert_eq!(n, MSG.len());
302/// assert_eq!(&buf, &*MSG);
303/// return Ok(());
304/// },
305/// _ => unreachable!(),
306/// }
307/// }
308/// }
309/// # }
310/// ```
311///
312/// Example that receives an event once the `Sender` is dropped.
313///
314/// ```
315/// # use std::io::{self, Read, Write};
316/// #
317/// # use mio::{Poll, Events, Interest, Token};
318/// # use mio_pipe::new_pipe;
319/// #
320/// # const PIPE_RECV: Token = Token(0);
321/// # const PIPE_SEND: Token = Token(1);
322/// #
323/// # fn main() -> io::Result<()> {
324/// // Same setup as in the example above.
325/// let mut poll = Poll::new()?;
326/// let mut events = Events::with_capacity(8);
327///
328/// let (mut sender, mut receiver) = new_pipe()?;
329///
330/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
331/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
332///
333/// // Drop the sender.
334/// drop(sender);
335///
336/// poll.poll(&mut events, None)?;
337///
338/// for event in events.iter() {
339/// match event.token() {
340/// PIPE_RECV if event.is_error() || event.is_read_closed() => {
341/// // Detected that the sender was dropped.
342/// println!("Sender dropped!");
343/// return Ok(());
344/// },
345/// _ => unreachable!(),
346/// }
347/// }
348/// # unreachable!();
349/// # }
350/// ```
351pub fn new_pipe() -> io::Result<(Sender, Receiver)> {
352 sys::new_pipe()
353 .map(|(sender, receiver)| (Sender { inner: sender }, Receiver { inner: receiver }))
354}