mio_pool/poll.rs
1extern crate nix;
2
3use std::os::unix::io::AsRawFd;
4use std::os::unix::io::RawFd;
5use std::time;
6use std::io;
7use self::nix::sys::epoll;
8
9/// Polls for readiness events on all registered file descriptors.
10///
11/// `Poll` allows a program to monitor a large number of file descriptors, waiting until one or
12/// more become "ready" for some class of operations; e.g. reading and writing. A file descriptor
13/// is considered ready if it is possible to immediately perform a corresponding operation; e.g.
14/// [`read`].
15///
16/// These `Poll` instances are optimized for a worker pool use-case, and so they are all
17/// oneshot, edge-triggered, and only support "ready to read".
18///
19/// To use `Poll`, a file descriptor must first be registered with the `Poll` instance using the
20/// [`register`] method. A `Token` is also passed to the [`register`] function, and that same
21/// `Token` is returned when the given file descriptor is ready.
22///
23/// [`read`]: tcp/struct.TcpStream.html#method.read
24/// [`register`]: #method.register
25/// [`reregister`]: #method.reregister
26///
27/// # Examples
28///
29/// A basic example -- establishing a `TcpStream` connection.
30///
31/// ```no_run
32/// # extern crate mio;
33/// # extern crate mio_pool;
34/// # use std::error::Error;
35/// # fn try_main() -> Result<(), Box<Error>> {
36/// use mio_pool::poll::{Events, Poll, Token};
37/// use mio::net::TcpStream;
38///
39/// use std::net::{TcpListener, SocketAddr};
40///
41/// // Bind a server socket to connect to.
42/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
43/// let server = TcpListener::bind(&addr)?;
44///
45/// // Construct a new `Poll` handle as well as the `Events` we'll store into
46/// let poll = Poll::new()?;
47/// let mut events = Events::with_capacity(1024);
48///
49/// // Connect the stream
50/// let stream = TcpStream::connect(&server.local_addr()?)?;
51///
52/// // Register the stream with `Poll`
53/// poll.register(&stream, Token(0))?;
54///
55/// // Wait for the socket to become ready. This has to happens in a loop to
56/// // handle spurious wakeups.
57/// loop {
58/// poll.poll(&mut events, None)?;
59///
60/// for Token(t) in &events {
61/// if t == 0 {
62/// // The socket connected (probably; it could be a spurious wakeup)
63/// return Ok(());
64/// }
65/// }
66/// }
67/// # Ok(())
68/// # }
69/// #
70/// # fn main() {
71/// # try_main().unwrap();
72/// # }
73/// ```
74///
75/// # Exclusive access
76///
77/// Since this `Poll` implementation is optimized for worker-pool style use-cases, all file
78/// descriptors are registered using `EPOLL_ONESHOT`. This means that once an event has been issued
79/// for a given descriptor, not more events will be issued for that descriptor until it has been
80/// re-registered using [`reregister`].
81pub struct Poll(RawFd);
82
83/// Associates an event with a file descriptor.
84///
85/// `Token` is a wrapper around `usize`, and is used as an argument to
86/// [`Poll::register`] and [`Poll::reregister`].
87///
88/// See [`Poll`] for more documentation on polling. You will likely want to use something like
89/// [`slab`] for creating and managing these.
90///
91/// [`Poll`]: struct.Poll.html
92/// [`Poll::register`]: struct.Poll.html#method.register
93/// [`Poll::reregister`]: struct.Poll.html#method.reregister
94/// [`slab`]: https://crates.io/crates/slab
95pub struct Token(pub usize);
96
97/// A collection of readiness events.
98///
99/// `Events` is passed as an argument to [`Poll::poll`], and provides any readiness events received
100/// since the last poll. Usually, a single `Events` instance is created at the same time as a
101/// [`Poll`] and reused on each call to [`Poll::poll`].
102///
103/// See [`Poll`] for more documentation on polling.
104///
105/// [`Poll::poll`]: struct.Poll.html#method.poll
106/// [`Poll`]: struct.Poll.html
107pub struct Events {
108 all: Vec<epoll::EpollEvent>,
109
110 /// How many of the events in `.all` are filled with responses to the last `poll()`?
111 current: usize,
112}
113
114impl Events {
115 /// Return a new `Events` capable of holding up to `capacity` events.
116 pub fn with_capacity(capacity: usize) -> Events {
117 let mut events = Vec::new();
118 events.resize(capacity, epoll::EpollEvent::empty());
119 Events {
120 all: events,
121 current: 0,
122 }
123 }
124}
125
126fn nix_to_io_err(e: nix::Error) -> io::Error {
127 match e {
128 nix::Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
129 nix::Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, e),
130 nix::Error::InvalidUtf8 => io::Error::new(io::ErrorKind::InvalidInput, e),
131 nix::Error::UnsupportedOperation => io::Error::new(io::ErrorKind::Other, e),
132 }
133}
134
135impl Poll {
136 /// Return a new `Poll` handle.
137 ///
138 /// This function will make a syscall to the operating system to create the system selector. If
139 /// this syscall fails, `Poll::new` will return with the error.
140 ///
141 /// See [struct] level docs for more details.
142 ///
143 /// [struct]: struct.Poll.html
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// # use std::error::Error;
149 /// # fn try_main() -> Result<(), Box<Error>> {
150 /// use mio_pool::poll::{Poll, Events};
151 /// use std::time::Duration;
152 ///
153 /// let poll = match Poll::new() {
154 /// Ok(poll) => poll,
155 /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
156 /// };
157 ///
158 /// // Create a structure to receive polled events
159 /// let mut events = Events::with_capacity(1024);
160 ///
161 /// // Wait for events, but none will be received because no `Evented`
162 /// // handles have been registered with this `Poll` instance.
163 /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
164 /// assert_eq!(n, 0);
165 /// # Ok(())
166 /// # }
167 /// #
168 /// # fn main() {
169 /// # try_main().unwrap();
170 /// # }
171 /// ```
172 pub fn new() -> io::Result<Self> {
173 epoll::epoll_create1(epoll::EpollCreateFlags::empty())
174 .map(Poll)
175 .map_err(nix_to_io_err)
176 }
177
178 fn ctl(&self, file: &AsRawFd, t: Token, op: epoll::EpollOp) -> io::Result<()> {
179 let mut event = epoll::EpollEvent::new(
180 epoll::EpollFlags::EPOLLIN | epoll::EpollFlags::EPOLLONESHOT,
181 t.0 as u64,
182 );
183 epoll::epoll_ctl(self.0, op, file.as_raw_fd(), &mut event).map_err(nix_to_io_err)
184 }
185
186 /// Register a file descriptor with this `Poll` instance.
187 ///
188 /// Once registered, the `Poll` instance monitors the given descriptor for readiness state
189 /// changes. When it notices a state change, it will return a readiness event for the handle
190 /// the next time [`poll`] is called.
191 ///
192 /// See the [`struct`] docs for a high level overview.
193 ///
194 /// `token` is user-defined value that is associated with the given `file`. When [`poll`]
195 /// returns an event for `file`, this token is included. This allows the caller to map the
196 /// event back to its descriptor. The token associated with a file descriptor can be changed at
197 /// any time by calling [`reregister`].
198 pub fn register(&self, file: &AsRawFd, t: Token) -> io::Result<()> {
199 self.ctl(file, t, epoll::EpollOp::EpollCtlAdd)
200 }
201
202 /// Re-register a file descriptor with this `Poll` instance.
203 ///
204 /// When you re-register a file descriptor, you can change the details of the registration.
205 /// Specifically, you can update the `token` specified in previous `register` and `reregister`
206 /// calls.
207 ///
208 /// See the [`register`] documentation for details about the function
209 /// arguments and see the [`struct`] docs for a high level overview of
210 /// polling.
211 ///
212 /// [`struct`]: #
213 /// [`register`]: #method.register
214 pub fn reregister(&self, file: &AsRawFd, t: Token) -> io::Result<()> {
215 self.ctl(file, t, epoll::EpollOp::EpollCtlMod)
216 }
217
218 /// Deregister a file descriptor from this `Poll` instance.
219 ///
220 /// When you deregister a file descriptor, it will no longer be modified for readiness events,
221 /// and it will no longer produce events from `poll`.
222 pub fn deregister(&self, file: &AsRawFd) -> io::Result<()> {
223 epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlDel, file.as_raw_fd(), None)
224 .map_err(nix_to_io_err)
225 }
226
227 /// Wait for events on file descriptors associated with this `Poll` instance.
228 ///
229 /// Blocks the current thread and waits for events for any of the file descriptors that are
230 /// registered with this `Poll` instance. The function blocks until either at least one
231 /// readiness event has been received or `timeout` has elapsed. A `timeout` of `None` means
232 /// that `poll` blocks until a readiness event has been received.
233 ///
234 /// The supplied `events` will be cleared and newly received readiness events will be pushed
235 /// onto the end. At most `events.capacity()` events will be returned. If there are further
236 /// pending readiness events, they are returned on the next call to `poll`.
237 ///
238 /// Note that once an event has been issued for a given `token` (or rather, for the token's
239 /// file descriptor), no further events will be issued for that descriptor until it has been
240 /// re-registered. Note also that the `timeout` is rounded up to the system clock granularity
241 /// (usually 1ms), and kernel scheduling delays mean that the blocking interval may be overrun
242 /// by a small amount.
243 ///
244 /// `poll` returns the number of events that have been pushed into `events`, or `Err` when an
245 /// error has been encountered with the system selector.
246 ///
247 /// See the [struct] level documentation for a higher level discussion of polling.
248 ///
249 /// [struct]: #
250 pub fn poll(&self, events: &mut Events, timeout: Option<time::Duration>) -> io::Result<usize> {
251 let timeout = match timeout {
252 None => -1,
253 Some(d) => (d.as_secs() * 1000 + d.subsec_nanos() as u64 / 1_000_000) as isize,
254 };
255
256 events.current =
257 epoll::epoll_wait(self.0, &mut events.all[..], timeout).map_err(nix_to_io_err)?;
258 Ok(events.current)
259 }
260}
261
262/// [`Events`] iterator.
263///
264/// This struct is created by the `into_iter` method on [`Events`].
265///
266/// [`Events`]: struct.Events.html
267pub struct EventsIterator<'a> {
268 events: &'a Events,
269 at: usize,
270}
271
272impl<'a> IntoIterator for &'a Events {
273 type IntoIter = EventsIterator<'a>;
274 type Item = Token;
275
276 fn into_iter(self) -> Self::IntoIter {
277 EventsIterator {
278 events: self,
279 at: 0,
280 }
281 }
282}
283
284impl<'a> Iterator for EventsIterator<'a> {
285 type Item = Token;
286 fn next(&mut self) -> Option<Self::Item> {
287 let at = &mut self.at;
288 if *at >= self.events.current {
289 // events beyond .1 are old
290 return None;
291 }
292
293 self.events.all.get(*at).map(|e| {
294 *at += 1;
295 Token(e.data() as usize)
296 })
297 }
298}