mio_st/sys/unix/
pipe.rs

1use std::mem;
2use std::io::{self, Read, Write};
3use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
4
5use crate::event::{Evented, EventedId};
6use crate::poll::{Interests, PollOption, Poller};
7use crate::sys::unix::EventedIo;
8
9/// Create a new non-blocking unix pipe.
10///
11/// This is a wrapper around unix's `pipe` system call and can be used as
12/// interprocess communication channel.
13///
14/// This channel may be created before forking the process and then one end used
15/// in each process, e.g. the parent process has the sending end to send command
16/// to the child process.
17///
18/// # Deregistering
19///
20/// Both `Sender` and `Receiver` will deregister themselves when dropped,
21/// **iff** the file descriptors are not duplicated (via `dup(2)`).
22///
23/// # Examples
24///
25/// ```
26/// # fn main() -> Result<(), Box<std::error::Error>> {
27/// use std::io::{Read, Write};
28///
29/// use mio_st::unix::{new_pipe, Sender, Receiver};
30/// use mio_st::event::{Events, EventedId};
31/// use mio_st::poll::{Poller, PollOption};
32///
33/// // Unique ids for the two ends of the channel.
34/// const CHANNEL_RECV_ID: EventedId = EventedId(0);
35/// const CHANNEL_SEND_ID: EventedId = EventedId(1);
36///
37/// // Create a `Poller` instance and the events container.
38/// let mut poller = Poller::new()?;
39/// let mut events = Events::new();
40///
41/// // Create a new pipe.
42/// let (mut sender, mut receiver) = new_pipe()?;
43///
44/// // Register both ends of the channel.
45/// poller.register(&mut receiver, CHANNEL_RECV_ID, Receiver::INTERESTS, PollOption::Level)?;
46/// poller.register(&mut sender, CHANNEL_SEND_ID, Sender::INTERESTS, PollOption::Level)?;
47///
48/// loop {
49///     // Check for new events.
50///     poller.poll(&mut events, None)?;
51///
52///     for event in &mut events {
53///         match event.id() {
54///             CHANNEL_RECV_ID => {
55///                 let mut buf = Vec::with_capacity(128);
56///                 let n = receiver.read(&mut buf)?;
57///                 println!("received: {:?}", &buf[0..n]);
58/// #               return Ok(());
59///             },
60///             CHANNEL_SEND_ID => sender.write_all(b"Hello world")?,
61///             _ => unreachable!(),
62///         }
63///     }
64/// }
65/// # }
66/// ```
67pub fn new_pipe() -> io::Result<(Sender, Receiver)> {
68    let mut fds: [RawFd; 2] = unsafe { mem::uninitialized() };
69
70    if unsafe { libc::pipe(fds.as_mut_ptr()) } == -1 {
71        Err(io::Error::last_os_error())
72    } else {
73        for fd in &fds {
74            if unsafe { libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) } == -1 {
75                return Err(io::Error::last_os_error());
76            }
77        }
78        let r = Receiver { inner: unsafe { EventedIo::from_raw_fd(fds[0]) } };
79        let w = Sender { inner: unsafe { EventedIo::from_raw_fd(fds[1]) } };
80        Ok((w, r))
81    }
82}
83
84/// Receiving end of an unix pipe.
85///
86/// See [`new_pipe`] for documentation, including examples.
87///
88/// [`new_pipe`]: fn.new_pipe.html
89#[derive(Debug)]
90pub struct Receiver {
91    inner: EventedIo,
92}
93
94impl Receiver {
95    /// The interests to use when registering to receive readable events.
96    pub const INTERESTS: Interests = Interests::READABLE;
97}
98
99impl Evented for Receiver {
100    fn register(&mut self, poller: &mut Poller, id: EventedId, interests: Interests, opt: PollOption) -> io::Result<()> {
101        debug_assert!(!interests.is_writable(), "receiving end of a pipe can never be written");
102        self.inner.register(poller, id, interests, opt)
103    }
104
105    fn reregister(&mut self, poller: &mut Poller, id: EventedId, interests: Interests, opt: PollOption) -> io::Result<()> {
106        debug_assert!(!interests.is_writable(), "receiving end of a pipe can never be written");
107        self.inner.reregister(poller, id, interests, opt)
108    }
109
110    fn deregister(&mut self, poller: &mut Poller) -> io::Result<()> {
111        self.inner.deregister(poller)
112    }
113}
114
115impl AsRawFd for Receiver {
116    fn as_raw_fd(&self) -> RawFd {
117        self.inner.as_raw_fd()
118    }
119}
120
121impl IntoRawFd for Receiver {
122    fn into_raw_fd(self) -> RawFd {
123        self.inner.into_raw_fd()
124    }
125}
126
127impl Read for Receiver {
128    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
129        self.inner.read(buf)
130    }
131}
132
133/// Sending end of an unix pipe.
134///
135/// See [`new_pipe`] for documentation, including examples.
136///
137/// [`new_pipe`]: fn.new_pipe.html
138#[derive(Debug)]
139pub struct Sender {
140    inner: EventedIo,
141}
142
143impl Sender {
144    /// The interests to use when registering to receive writable events.
145    pub const INTERESTS: Interests = Interests::WRITABLE;
146}
147
148impl Evented for Sender {
149    fn register(&mut self, poller: &mut Poller, id: EventedId, interests: Interests, opt: PollOption) -> io::Result<()> {
150        debug_assert!(!interests.is_readable(), "sending end of a pipe can never be read");
151        self.inner.register(poller, id, interests, opt)
152    }
153
154    fn reregister(&mut self, poller: &mut Poller, id: EventedId, interests: Interests, opt: PollOption) -> io::Result<()> {
155        debug_assert!(!interests.is_readable(), "sending end of a pipe can never be read");
156        self.inner.reregister(poller, id, interests, opt)
157    }
158
159    fn deregister(&mut self, poller: &mut Poller) -> io::Result<()> {
160        self.inner.deregister(poller)
161    }
162}
163
164impl AsRawFd for Sender {
165    fn as_raw_fd(&self) -> RawFd {
166        self.inner.as_raw_fd()
167    }
168}
169
170impl IntoRawFd for Sender {
171    fn into_raw_fd(self) -> RawFd {
172        self.inner.into_raw_fd()
173    }
174}
175
176impl Write for Sender {
177    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
178        self.inner.write(buf)
179    }
180
181    fn flush(&mut self) -> io::Result<()> {
182        self.inner.flush()
183    }
184}