Skip to main content

mio_pool/
poll.rs

1extern crate nix;
2
3use std::os::unix::io::AsRawFd;
4use std::os::unix::io::RawFd;
5use std::time;
6use std::io;
7use self::nix::sys::epoll;
8
9/// Polls for readiness events on all registered file descriptors.
10///
11/// `Poll` allows a program to monitor a large number of file descriptors, waiting until one or
12/// more become "ready" for some class of operations; e.g. reading and writing. A file descriptor
13/// is considered ready if it is possible to immediately perform a corresponding operation; e.g.
14/// [`read`].
15///
16/// These `Poll` instances are optimized for a worker pool use-case, and so they are all
17/// oneshot, edge-triggered, and only support "ready to read".
18///
19/// To use `Poll`, a file descriptor must first be registered with the `Poll` instance using the
20/// [`register`] method. A `Token` is also passed to the [`register`] function, and that same
21/// `Token` is returned when the given file descriptor is ready.
22///
23/// [`read`]: tcp/struct.TcpStream.html#method.read
24/// [`register`]: #method.register
25/// [`reregister`]: #method.reregister
26///
27/// # Examples
28///
29/// A basic example -- establishing a `TcpStream` connection.
30///
31/// ```no_run
32/// # extern crate mio;
33/// # extern crate mio_pool;
34/// # use std::error::Error;
35/// # fn try_main() -> Result<(), Box<Error>> {
36/// use mio_pool::poll::{Events, Poll, Token};
37/// use mio::net::TcpStream;
38///
39/// use std::net::{TcpListener, SocketAddr};
40///
41/// // Bind a server socket to connect to.
42/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
43/// let server = TcpListener::bind(&addr)?;
44///
45/// // Construct a new `Poll` handle as well as the `Events` we'll store into
46/// let poll = Poll::new()?;
47/// let mut events = Events::with_capacity(1024);
48///
49/// // Connect the stream
50/// let stream = TcpStream::connect(&server.local_addr()?)?;
51///
52/// // Register the stream with `Poll`
53/// poll.register(&stream, Token(0))?;
54///
55/// // Wait for the socket to become ready. This has to happens in a loop to
56/// // handle spurious wakeups.
57/// loop {
58///     poll.poll(&mut events, None)?;
59///
60///     for Token(t) in &events {
61///         if t == 0 {
62///             // The socket connected (probably; it could be a spurious wakeup)
63///             return Ok(());
64///         }
65///     }
66/// }
67/// #     Ok(())
68/// # }
69/// #
70/// # fn main() {
71/// #     try_main().unwrap();
72/// # }
73/// ```
74///
75/// # Exclusive access
76///
77/// Since this `Poll` implementation is optimized for worker-pool style use-cases, all file
78/// descriptors are registered using `EPOLL_ONESHOT`. This means that once an event has been issued
79/// for a given descriptor, not more events will be issued for that descriptor until it has been
80/// re-registered using [`reregister`].
81pub struct Poll(RawFd);
82
83/// Associates an event with a file descriptor.
84///
85/// `Token` is a wrapper around `usize`, and is used as an argument to
86/// [`Poll::register`] and [`Poll::reregister`].
87///
88/// See [`Poll`] for more documentation on polling. You will likely want to use something like
89/// [`slab`] for creating and managing these.
90///
91/// [`Poll`]: struct.Poll.html
92/// [`Poll::register`]: struct.Poll.html#method.register
93/// [`Poll::reregister`]: struct.Poll.html#method.reregister
94/// [`slab`]: https://crates.io/crates/slab
95pub struct Token(pub usize);
96
97/// A collection of readiness events.
98///
99/// `Events` is passed as an argument to [`Poll::poll`], and provides any readiness events received
100/// since the last poll. Usually, a single `Events` instance is created at the same time as a
101/// [`Poll`] and reused on each call to [`Poll::poll`].
102///
103/// See [`Poll`] for more documentation on polling.
104///
105/// [`Poll::poll`]: struct.Poll.html#method.poll
106/// [`Poll`]: struct.Poll.html
107pub struct Events {
108    all: Vec<epoll::EpollEvent>,
109
110    /// How many of the events in `.all` are filled with responses to the last `poll()`?
111    current: usize,
112}
113
114impl Events {
115    /// Return a new `Events` capable of holding up to `capacity` events.
116    pub fn with_capacity(capacity: usize) -> Events {
117        let mut events = Vec::new();
118        events.resize(capacity, epoll::EpollEvent::empty());
119        Events {
120            all: events,
121            current: 0,
122        }
123    }
124}
125
126fn nix_to_io_err(e: nix::Error) -> io::Error {
127    match e {
128        nix::Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
129        nix::Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, e),
130        nix::Error::InvalidUtf8 => io::Error::new(io::ErrorKind::InvalidInput, e),
131        nix::Error::UnsupportedOperation => io::Error::new(io::ErrorKind::Other, e),
132    }
133}
134
135impl Poll {
136    /// Return a new `Poll` handle.
137    ///
138    /// This function will make a syscall to the operating system to create the system selector. If
139    /// this syscall fails, `Poll::new` will return with the error.
140    ///
141    /// See [struct] level docs for more details.
142    ///
143    /// [struct]: struct.Poll.html
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// # use std::error::Error;
149    /// # fn try_main() -> Result<(), Box<Error>> {
150    /// use mio_pool::poll::{Poll, Events};
151    /// use std::time::Duration;
152    ///
153    /// let poll = match Poll::new() {
154    ///     Ok(poll) => poll,
155    ///     Err(e) => panic!("failed to create Poll instance; err={:?}", e),
156    /// };
157    ///
158    /// // Create a structure to receive polled events
159    /// let mut events = Events::with_capacity(1024);
160    ///
161    /// // Wait for events, but none will be received because no `Evented`
162    /// // handles have been registered with this `Poll` instance.
163    /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
164    /// assert_eq!(n, 0);
165    /// #     Ok(())
166    /// # }
167    /// #
168    /// # fn main() {
169    /// #     try_main().unwrap();
170    /// # }
171    /// ```
172    pub fn new() -> io::Result<Self> {
173        epoll::epoll_create1(epoll::EpollCreateFlags::empty())
174            .map(Poll)
175            .map_err(nix_to_io_err)
176    }
177
178    fn ctl(&self, file: &AsRawFd, t: Token, op: epoll::EpollOp) -> io::Result<()> {
179        let mut event = epoll::EpollEvent::new(
180            epoll::EpollFlags::EPOLLIN | epoll::EpollFlags::EPOLLONESHOT,
181            t.0 as u64,
182        );
183        epoll::epoll_ctl(self.0, op, file.as_raw_fd(), &mut event).map_err(nix_to_io_err)
184    }
185
186    /// Register a file descriptor with this `Poll` instance.
187    ///
188    /// Once registered, the `Poll` instance monitors the given descriptor for readiness state
189    /// changes. When it notices a state change, it will return a readiness event for the handle
190    /// the next time [`poll`] is called.
191    ///
192    /// See the [`struct`] docs for a high level overview.
193    ///
194    /// `token` is user-defined value that is associated with the given `file`. When [`poll`]
195    /// returns an event for `file`, this token is included. This allows the caller to map the
196    /// event back to its descriptor. The token associated with a file descriptor can be changed at
197    /// any time by calling [`reregister`].
198    pub fn register(&self, file: &AsRawFd, t: Token) -> io::Result<()> {
199        self.ctl(file, t, epoll::EpollOp::EpollCtlAdd)
200    }
201
202    /// Re-register a file descriptor with this `Poll` instance.
203    ///
204    /// When you re-register a file descriptor, you can change the details of the registration.
205    /// Specifically, you can update the `token` specified in previous `register` and `reregister`
206    /// calls.
207    ///
208    /// See the [`register`] documentation for details about the function
209    /// arguments and see the [`struct`] docs for a high level overview of
210    /// polling.
211    ///
212    /// [`struct`]: #
213    /// [`register`]: #method.register
214    pub fn reregister(&self, file: &AsRawFd, t: Token) -> io::Result<()> {
215        self.ctl(file, t, epoll::EpollOp::EpollCtlMod)
216    }
217
218    /// Deregister a file descriptor from this `Poll` instance.
219    ///
220    /// When you deregister a file descriptor, it will no longer be modified for readiness events,
221    /// and it will no longer produce events from `poll`.
222    pub fn deregister(&self, file: &AsRawFd) -> io::Result<()> {
223        epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlDel, file.as_raw_fd(), None)
224            .map_err(nix_to_io_err)
225    }
226
227    /// Wait for events on file descriptors associated with this `Poll` instance.
228    ///
229    /// Blocks the current thread and waits for events for any of the file descriptors that are
230    /// registered with this `Poll` instance. The function blocks until either at least one
231    /// readiness event has been received or `timeout` has elapsed. A `timeout` of `None` means
232    /// that `poll` blocks until a readiness event has been received.
233    ///
234    /// The supplied `events` will be cleared and newly received readiness events will be pushed
235    /// onto the end. At most `events.capacity()` events will be returned. If there are further
236    /// pending readiness events, they are returned on the next call to `poll`.
237    ///
238    /// Note that once an event has been issued for a given `token` (or rather, for the token's
239    /// file descriptor), no further events will be issued for that descriptor until it has been
240    /// re-registered. Note also that the `timeout` is rounded up to the system clock granularity
241    /// (usually 1ms), and kernel scheduling delays mean that the blocking interval may be overrun
242    /// by a small amount.
243    ///
244    /// `poll` returns the number of events that have been pushed into `events`, or `Err` when an
245    /// error has been encountered with the system selector.
246    ///
247    /// See the [struct] level documentation for a higher level discussion of polling.
248    ///
249    /// [struct]: #
250    pub fn poll(&self, events: &mut Events, timeout: Option<time::Duration>) -> io::Result<usize> {
251        let timeout = match timeout {
252            None => -1,
253            Some(d) => (d.as_secs() * 1000 + d.subsec_nanos() as u64 / 1_000_000) as isize,
254        };
255
256        events.current =
257            epoll::epoll_wait(self.0, &mut events.all[..], timeout).map_err(nix_to_io_err)?;
258        Ok(events.current)
259    }
260}
261
262/// [`Events`] iterator.
263///
264/// This struct is created by the `into_iter` method on [`Events`].
265///
266/// [`Events`]: struct.Events.html
267pub struct EventsIterator<'a> {
268    events: &'a Events,
269    at: usize,
270}
271
272impl<'a> IntoIterator for &'a Events {
273    type IntoIter = EventsIterator<'a>;
274    type Item = Token;
275
276    fn into_iter(self) -> Self::IntoIter {
277        EventsIterator {
278            events: self,
279            at: 0,
280        }
281    }
282}
283
284impl<'a> Iterator for EventsIterator<'a> {
285    type Item = Token;
286    fn next(&mut self) -> Option<Self::Item> {
287        let at = &mut self.at;
288        if *at >= self.events.current {
289            // events beyond .1 are old
290            return None;
291        }
292
293        self.events.all.get(*at).map(|e| {
294            *at += 1;
295            Token(e.data() as usize)
296        })
297    }
298}