corcovado/poll.rs
1use event_imp::{self as event, Event, Evented, PollOpt, Ready};
2use std::cell::UnsafeCell;
3#[cfg(all(unix, not(target_os = "fuchsia")))]
4use std::os::unix::io::AsRawFd;
5#[cfg(all(unix, not(target_os = "fuchsia")))]
6use std::os::unix::io::RawFd;
7use std::process;
8use std::rc::Rc;
9use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
10use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::{Duration, Instant};
13use std::{fmt, io, ptr};
14use std::{mem, ops};
15use {sys, Token};
16
17// Poll is backed by two readiness queues. The first is a system readiness queue
18// represented by `sys::Selector`. The system readiness queue handles events
19// provided by the system, such as TCP and UDP. The second readiness queue is
20// implemented in user space by `ReadinessQueue`. It provides a way to implement
21// purely user space `Evented` types.
22//
23// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
24// list nodes. This significantly reduces the number of required allocations.
25// Each `Registration` / `SetReadiness` pair allocates a single readiness node
26// that is used for the lifetime of the registration.
27//
28// The readiness node also includes a single atomic variable, `state` that
29// tracks most of the state associated with the registration. This includes the
30// current readiness, interest, poll options, and internal state. When the node
31// state is mutated, it is queued in the MPSC channel. A call to
32// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
33// still be mutated while it is queued in the channel for processing.
34// Intermediate state values do not matter as long as the final state is
35// included in the call to `poll`. This is the eventually consistent nature of
36// the readiness queue.
37//
38// The readiness node is ref counted using the `ref_count` field. On creation,
39// the ref_count is initialized to 3: one `Registration` handle, one
40// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
41// doesn't *always* hold a handle to the node, we don't use the Arc type for
42// managing ref counts (this is to avoid constantly incrementing and
43// decrementing the ref count when pushing & popping from the queue). When the
44// `Registration` handle is dropped, the `dropped` flag is set on the node, then
45// the node is pushed into the registration queue. When Poll::poll pops the
46// node, it sees the drop flag is set, and decrements it's ref count.
47//
48// The MPSC queue is a modified version of the intrusive MPSC node based queue
49// described by 1024cores [1].
50//
51// The first modification is that two markers are used instead of a single
52// `stub`. The second marker is a `sleep_marker` which is used to signal to
53// producers that the consumer is going to sleep. This sleep_marker is only used
54// when the queue is empty, implying that the only node in the queue is
55// `end_marker`.
56//
57// The second modification is an `until` argument passed to the dequeue
58// function. When `poll` encounters a level-triggered node, the node will be
59// immediately pushed back into the queue. In order to avoid an infinite loop,
60// `poll` before pushing the node, the pointer is saved off and then passed
61// again as the `until` argument. If the next node to pop is `until`, then
62// `Dequeue::Empty` is returned.
63//
64// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
65
66/// Polls for readiness events on all registered values.
67///
68/// `Poll` allows a program to monitor a large number of `Evented` types,
69/// waiting until one or more become "ready" for some class of operations; e.g.
70/// reading and writing. An `Evented` type is considered ready if it is possible
71/// to immediately perform a corresponding operation; e.g. [`read`] or
72/// [`write`].
73///
74/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
75/// instance using the [`register`] method, supplying readiness interest. The
76/// readiness interest tells `Poll` which specific operations on the handle to
77/// monitor for readiness. A `Token` is also passed to the [`register`]
78/// function. When `Poll` returns a readiness event, it will include this token.
79/// This associates the event with the `Evented` handle that generated the
80/// event.
81///
82/// [`read`]: tcp/struct.TcpStream.html#method.read
83/// [`write`]: tcp/struct.TcpStream.html#method.write
84/// [`register`]: #method.register
85///
86// # Examples
87//
88// A basic example -- establishing a `TcpStream` connection.
89//
90// ```
91// # use std::error::Error;
92// # fn try_main() -> Result<(), Box<dyn Error>> {
93// use corcovado::{Events, Poll, Ready, PollOpt, Token};
94//
95// use std::net::{TcpStream, TcpListener, SocketAddr};
96//
97// // Bind a server socket to connect to.
98// let addr: SocketAddr = "127.0.0.1:0".parse()?;
99// let server = TcpListener::bind(&addr)?;
100//
101// // Construct a new `Poll` handle as well as the `Events` we'll store into
102// let poll = Poll::new()?;
103// let mut events = Events::with_capacity(1024);
104//
105// // Connect the stream
106// let stream = TcpStream::connect(&server.local_addr()?)?;
107//
108// // Register the stream with `Poll`
109// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
110//
111// // Wait for the socket to become ready. This has to happens in a loop to
112// // handle spurious wakeups.
113// loop {
114// poll.poll(&mut events, None)?;
115//
116// for event in &events {
117// if event.token() == Token(0) && event.readiness().is_writable() {
118// // The socket connected (probably, it could still be a spurious
119// // wakeup)
120// return Ok(());
121// }
122// }
123// }
124// # Ok(())
125// # }
126// #
127// # fn main() {
128// # try_main().unwrap();
129// # }
130// ```
131///
132/// # Edge-triggered and level-triggered
133///
134/// An [`Evented`] registration may request edge-triggered events or
135/// level-triggered events. This is done by setting `register`'s
136/// [`PollOpt`] argument to either [`edge`] or [`level`].
137///
138/// The difference between the two can be described as follows. Supposed that
139/// this scenario happens:
140///
141/// 1. A [`TcpStream`] is registered with `Poll`.
142/// 2. The socket receives 2kb of data.
143/// 3. A call to [`Poll::poll`] returns the token associated with the socket
144/// indicating readable readiness.
145/// 4. 1kb is read from the socket.
146/// 5. Another call to [`Poll::poll`] is made.
147///
148/// If when the socket was registered with `Poll`, edge triggered events were
149/// requested, then the call to [`Poll::poll`] done in step **5** will
150/// (probably) hang despite there being another 1kb still present in the socket
151/// read buffer. The reason for this is that edge-triggered mode delivers events
152/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
153/// caller might end up waiting for some data that is already present inside the
154/// socket buffer.
155///
156/// With edge-triggered events, operations **must** be performed on the
157/// `Evented` type until [`WouldBlock`] is returned. In other words, after
158/// receiving an event indicating readiness for a certain operation, one should
159/// assume that [`Poll::poll`] may never return another event for the same token
160/// and readiness until the operation returns [`WouldBlock`].
161///
162/// By contrast, when level-triggered notifications was requested, each call to
163/// [`Poll::poll`] will return an event for the socket as long as data remains
164/// in the socket buffer. Generally, level-triggered events should be avoided if
165/// high performance is a concern.
166///
167/// Since even with edge-triggered events, multiple events can be generated upon
168/// receipt of multiple chunks of data, the caller has the option to set the
169/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
170/// after the event is returned from [`Poll::poll`]. The subsequent calls to
171/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
172/// are disabled even if the readiness state changes. The handle can be
173/// re-enabled by calling [`reregister`]. When handles are disabled, internal
174/// resources used to monitor the handle are maintained until the handle is
175/// dropped or deregistered. This makes re-registering the handle a fast
176/// operation.
177///
178/// For example, in the following scenario:
179///
180/// 1. A [`TcpStream`] is registered with `Poll`.
181/// 2. The socket receives 2kb of data.
182/// 3. A call to [`Poll::poll`] returns the token associated with the socket
183/// indicating readable readiness.
184/// 4. 2kb is read from the socket.
185/// 5. Another call to read is issued and [`WouldBlock`] is returned
186/// 6. The socket receives another 2kb of data.
187/// 7. Another call to [`Poll::poll`] is made.
188///
189/// Assuming the socket was registered with `Poll` with the [`edge`] and
190/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
191/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
192/// returning an event.
193///
194/// In order to receive the event for the data received in step 6, the socket
195/// would need to be reregistered using [`reregister`].
196///
197/// [`PollOpt`]: struct.PollOpt.html
198/// [`edge`]: struct.PollOpt.html#method.edge
199/// [`level`]: struct.PollOpt.html#method.level
200/// [`Poll::poll`]: struct.Poll.html#method.poll
201/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
202/// [`Evented`]: event/trait.Evented.html
203/// [`TcpStream`]: tcp/struct.TcpStream.html
204/// [`reregister`]: #method.reregister
205/// [`oneshot`]: struct.PollOpt.html#method.oneshot
206///
207/// # Portability
208///
209/// Using `Poll` provides a portable interface across supported platforms as
210/// long as the caller takes the following into consideration:
211///
212/// ### Spurious events
213///
214/// [`Poll::poll`] may return readiness events even if the associated
215/// [`Evented`] handle is not actually ready. Given the same code, this may
216/// happen more on some platforms than others. It is important to never assume
217/// that, just because a readiness notification was received, that the
218/// associated operation will succeed as well.
219///
220/// If operation fails with [`WouldBlock`], then the caller should not treat
221/// this as an error, but instead should wait until another readiness event is
222/// received.
223///
224/// ### Draining readiness
225///
226/// When using edge-triggered mode, once a readiness event is received, the
227/// corresponding operation must be performed repeatedly until it returns
228/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
229/// readiness event will be delivered, even if further data is received for the
230/// [`Evented`] handle.
231///
232/// For example, in the first scenario described above, after step 5, even if
233/// the socket receives more data there is no guarantee that another readiness
234/// event will be delivered.
235///
236/// ### Readiness operations
237///
238/// The only readiness operations that are guaranteed to be present on all
239/// supported platforms are [`readable`] and [`writable`]. All other readiness
240/// operations may have false negatives and as such should be considered
241/// **hints**. This means that if a socket is registered with [`readable`],
242/// [`error`], and [`hup`] interest, and either an error or hup is received, a
243/// readiness event will be generated for the socket, but it **may** only
244/// include `readable` readiness. Also note that, given the potential for
245/// spurious events, receiving a readiness event with `hup` or `error` doesn't
246/// actually mean that a `read` on the socket will return a result matching the
247/// readiness event.
248///
249/// In other words, portable programs that explicitly check for [`hup`] or
250/// [`error`] readiness should be doing so as an **optimization** and always be
251/// able to handle an error or HUP situation when performing the actual read
252/// operation.
253///
254/// [`readable`]: struct.Ready.html#method.readable
255/// [`writable`]: struct.Ready.html#method.writable
256/// [`error`]: unix/struct.UnixReady.html#method.error
257/// [`hup`]: unix/struct.UnixReady.html#method.hup
258///
259/// ### Registering handles
260///
261/// Unless otherwise noted, it should be assumed that types implementing
262/// [`Evented`] will never become ready unless they are registered with `Poll`.
263///
264/// For example:
265///
266// ```
267// # use std::error::Error;
268// # fn try_main() -> Result<(), Box<dyn Error>> {
269// use corcovado::{Poll, Ready, PollOpt, Token};
270// use std::net::TcpStream;
271// use std::time::Duration;
272// use std::thread;
273//
274// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275//
276// thread::sleep(Duration::from_secs(1));
277//
278// let poll = Poll::new()?;
279//
280// // The connect is not guaranteed to have started until it is registered at
281// // this point
282// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283// # Ok(())
284// # }
285// #
286// # fn main() {
287// # try_main().unwrap();
288// # }
289// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// | OS | Selector |
296/// |------------|-----------|
297/// | Linux | [epoll] |
298/// | OS X, iOS | [kqueue] |
299/// | Windows | [IOCP] |
300/// | FreeBSD | [kqueue] |
301/// | Android | [epoll] |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model Mio's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
323/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
324/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
325/// [`EventedFd`]: unix/struct.EventedFd.html
326/// [`SetReadiness`]: struct.SetReadiness.html
327/// [`Poll::poll`]: struct.Poll.html#method.poll
328pub struct Poll {
329 // Platform specific IO selector
330 selector: sys::Selector,
331
332 // Custom readiness queue
333 readiness_queue: ReadinessQueue,
334
335 // Use an atomic to first check if a full lock will be required. This is a
336 // fast-path check for single threaded cases avoiding the extra syscall
337 lock_state: AtomicUsize,
338
339 // Sequences concurrent calls to `Poll::poll`
340 lock: Mutex<()>,
341
342 // Wakeup the next waiter
343 condvar: Condvar,
344}
345
346/// Handle to a user space `Poll` registration.
347///
348/// `Registration` allows implementing [`Evented`] for types that cannot work
349/// with the [system selector]. A `Registration` is always paired with a
350/// `SetReadiness`, which allows updating the registration's readiness state.
351/// When [`set_readiness`] is called and the `Registration` is associated with a
352/// [`Poll`] instance, a readiness event will be created and eventually returned
353/// by [`poll`].
354///
355/// A `Registration` / `SetReadiness` pair is created by calling
356/// [`Registration::new2`]. At this point, the registration is not being
357/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
358/// result in any readiness notifications.
359///
360/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
361/// the same [`register`], [`reregister`], and [`deregister`] functions used
362/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
363/// changes result in readiness events being dispatched to the [`Poll`] instance
364/// with which `Registration` is registered.
365///
366/// **Note**, before using `Registration` be sure to read the
367/// [`set_readiness`] documentation and the [portability] notes. The
368/// guarantees offered by `Registration` may be weaker than expected.
369///
370/// For high level documentation, see [`Poll`].
371///
372/// # Examples
373///
374/// ```
375/// use corcovado::{Ready, Registration, Poll, PollOpt, Token};
376/// use corcovado::event::Evented;
377///
378/// use std::io;
379/// use std::time::Instant;
380/// use std::thread;
381///
382/// pub struct Deadline {
383/// when: Instant,
384/// registration: Registration,
385/// }
386///
387/// impl Deadline {
388/// pub fn new(when: Instant) -> Deadline {
389/// let (registration, set_readiness) = Registration::new2();
390///
391/// thread::spawn(move || {
392/// let now = Instant::now();
393///
394/// if now < when {
395/// thread::sleep(when - now);
396/// }
397///
398/// set_readiness.set_readiness(Ready::readable());
399/// });
400///
401/// Deadline {
402/// when: when,
403/// registration: registration,
404/// }
405/// }
406///
407/// pub fn is_elapsed(&self) -> bool {
408/// Instant::now() >= self.when
409/// }
410/// }
411///
412/// impl Evented for Deadline {
413/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
414/// -> io::Result<()>
415/// {
416/// self.registration.register(poll, token, interest, opts)
417/// }
418///
419/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
420/// -> io::Result<()>
421/// {
422/// self.registration.reregister(poll, token, interest, opts)
423/// }
424///
425/// fn deregister(&self, poll: &Poll) -> io::Result<()> {
426/// poll.deregister(&self.registration)
427/// }
428/// }
429/// ```
430///
431/// [system selector]: struct.Poll.html#implementation-notes
432/// [`Poll`]: struct.Poll.html
433/// [`Registration::new2`]: struct.Registration.html#method.new2
434/// [`Evented`]: event/trait.Evented.html
435/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
436/// [`register`]: struct.Poll.html#method.register
437/// [`reregister`]: struct.Poll.html#method.reregister
438/// [`deregister`]: struct.Poll.html#method.deregister
439/// [portability]: struct.Poll.html#portability
440pub struct Registration {
441 inner: RegistrationInner,
442}
443
444unsafe impl Send for Registration {}
445unsafe impl Sync for Registration {}
446
447/// Updates the readiness state of the associated `Registration`.
448///
449/// See [`Registration`] for more documentation on using `SetReadiness` and
450/// [`Poll`] for high level polling documentation.
451///
452/// [`Poll`]: struct.Poll.html
453/// [`Registration`]: struct.Registration.html
454#[derive(Clone)]
455pub struct SetReadiness {
456 inner: RegistrationInner,
457}
458
459unsafe impl Send for SetReadiness {}
460unsafe impl Sync for SetReadiness {}
461
462/// Used to associate an IO type with a Selector
463#[derive(Debug)]
464pub struct SelectorId {
465 id: AtomicUsize,
466}
467
468struct RegistrationInner {
469 // Unsafe pointer to the registration's node. The node is ref counted. This
470 // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
471 // handle though it isn't stored anywhere. In other words, `Poll::poll`
472 // needs to decrement the ref count before the node is freed.
473 node: *mut ReadinessNode,
474}
475
476#[derive(Clone)]
477struct ReadinessQueue {
478 inner: Rc<ReadinessQueueInner>,
479}
480
481unsafe impl Send for ReadinessQueue {}
482unsafe impl Sync for ReadinessQueue {}
483
484struct ReadinessQueueInner {
485 // Used to wake up `Poll` when readiness is set in another thread.
486 awakener: sys::Awakener,
487
488 // Head of the MPSC queue used to signal readiness to `Poll::poll`.
489 head_readiness: AtomicPtr<ReadinessNode>,
490
491 // Tail of the readiness queue.
492 //
493 // Only accessed by Poll::poll. Coordination will be handled by the poll fn
494 tail_readiness: UnsafeCell<*mut ReadinessNode>,
495
496 // Fake readiness node used to punctuate the end of the readiness queue.
497 // Before attempting to read from the queue, this node is inserted in order
498 // to partition the queue between nodes that are "owned" by the dequeue end
499 // and nodes that will be pushed on by producers.
500 end_marker: Box<ReadinessNode>,
501
502 // Similar to `end_marker`, but this node signals to producers that `Poll`
503 // has gone to sleep and must be woken up.
504 sleep_marker: Box<ReadinessNode>,
505
506 // Similar to `end_marker`, but the node signals that the queue is closed.
507 // This happens when `ReadyQueue` is dropped and signals to producers that
508 // the nodes should no longer be pushed into the queue.
509 closed_marker: Box<ReadinessNode>,
510}
511
512/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
513/// queued into the MPSC channel.
514struct ReadinessNode {
515 // Node state, see struct docs for `ReadinessState`
516 //
517 // This variable is the primary point of coordination between all the
518 // various threads concurrently accessing the node.
519 state: AtomicState,
520
521 // The registration token cannot fit into the `state` variable, so it is
522 // broken out here. In order to atomically update both the state and token
523 // we have to jump through a few hoops.
524 //
525 // First, `state` includes `token_read_pos` and `token_write_pos`. These can
526 // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
527 // the token slot that contains the most up to date registration token.
528 // `token_read_pos` is the token slot that `poll` is currently reading from.
529 //
530 // When a call to `update` includes a different token than the one currently
531 // associated with the registration (token_write_pos), first an unused token
532 // slot is found. The unused slot is the one not represented by
533 // `token_read_pos` OR `token_write_pos`. The new token is written to this
534 // slot, then `state` is updated with the new `token_write_pos` value. This
535 // requires that there is only a *single* concurrent call to `update`.
536 //
537 // When `poll` reads a node state, it checks that `token_read_pos` matches
538 // `token_write_pos`. If they do not match, then it atomically updates
539 // `state` such that `token_read_pos` is set to `token_write_pos`. It will
540 // then read the token at the newly updated `token_read_pos`.
541 token_0: UnsafeCell<Token>,
542 token_1: UnsafeCell<Token>,
543 token_2: UnsafeCell<Token>,
544
545 // Used when the node is queued in the readiness linked list. Accessing
546 // this field requires winning the "queue" lock
547 next_readiness: AtomicPtr<ReadinessNode>,
548
549 // Ensures that there is only one concurrent call to `update`.
550 //
551 // Each call to `update` will attempt to swap `update_lock` from `false` to
552 // `true`. If the CAS succeeds, the thread has obtained the update lock. If
553 // the CAS fails, then the `update` call returns immediately and the update
554 // is discarded.
555 update_lock: AtomicBool,
556
557 // Pointer to Arc<ReadinessQueueInner>
558 readiness_queue: AtomicPtr<()>,
559
560 // Tracks the number of `ReadyRef` pointers
561 ref_count: AtomicUsize,
562}
563
564/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
565/// atomic variable handles encoding / decoding `ReadinessState` values.
566struct AtomicState {
567 inner: AtomicUsize,
568}
569
570const MASK_2: usize = 4 - 1;
571const MASK_4: usize = 16 - 1;
572const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
573const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
574
575const READINESS_SHIFT: usize = 0;
576const INTEREST_SHIFT: usize = 4;
577const POLL_OPT_SHIFT: usize = 8;
578const TOKEN_RD_SHIFT: usize = 12;
579const TOKEN_WR_SHIFT: usize = 14;
580const QUEUED_SHIFT: usize = 16;
581const DROPPED_SHIFT: usize = 17;
582
583/// Tracks all state for a single `ReadinessNode`. The state is packed into a
584/// `usize` variable from low to high bit as follows:
585///
586/// 4 bits: Registration current readiness
587/// 4 bits: Registration interest
588/// 4 bits: Poll options
589/// 2 bits: Token position currently being read from by `poll`
590/// 2 bits: Token position last written to by `update`
591/// 1 bit: Queued flag, set when node is being pushed into MPSC queue.
592/// 1 bit: Dropped flag, set when all `Registration` handles have been dropped.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594struct ReadinessState(usize);
595
596/// Returned by `dequeue_node`. Represents the different states as described by
597/// the queue documentation on 1024cores.net.
598enum Dequeue {
599 Data(*mut ReadinessNode),
600 Empty,
601 Inconsistent,
602}
603
604const AWAKEN: Token = Token(usize::MAX);
605const MAX_REFCOUNT: usize = (isize::MAX) as usize;
606
607/*
608 *
609 * ===== Poll =====
610 *
611 */
612
613impl Poll {
614 /// Return a new `Poll` handle.
615 ///
616 /// This function will make a syscall to the operating system to create the
617 /// system selector. If this syscall fails, `Poll::new` will return with the
618 /// error.
619 ///
620 /// See [struct] level docs for more details.
621 ///
622 /// [struct]: struct.Poll.html
623 ///
624 /// # Examples
625 ///
626 /// ```
627 /// # use std::error::Error;
628 /// # fn try_main() -> Result<(), Box<dyn Error>> {
629 /// use corcovado::{Poll, Events};
630 /// use std::time::Duration;
631 ///
632 /// let poll = match Poll::new() {
633 /// Ok(poll) => poll,
634 /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
635 /// };
636 ///
637 /// // Create a structure to receive polled events
638 /// let mut events = Events::with_capacity(1024);
639 ///
640 /// // Wait for events, but none will be received because no `Evented`
641 /// // handles have been registered with this `Poll` instance.
642 /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
643 /// assert_eq!(n, 0);
644 /// # Ok(())
645 /// # }
646 /// #
647 /// # fn main() {
648 /// # try_main().unwrap();
649 /// # }
650 /// ```
651 pub fn new() -> io::Result<Poll> {
652 is_send::<Poll>();
653 is_sync::<Poll>();
654
655 let poll = Poll {
656 selector: sys::Selector::new()?,
657 readiness_queue: ReadinessQueue::new()?,
658 lock_state: AtomicUsize::new(0),
659 lock: Mutex::new(()),
660 condvar: Condvar::new(),
661 };
662
663 // Register the notification wakeup FD with the IO poller
664 poll.readiness_queue.inner.awakener.register(
665 &poll,
666 AWAKEN,
667 Ready::readable(),
668 PollOpt::edge(),
669 )?;
670
671 Ok(poll)
672 }
673
674 /// Register an `Evented` handle with the `Poll` instance.
675 ///
676 /// Once registered, the `Poll` instance will monitor the `Evented` handle
677 /// for readiness state changes. When it notices a state change, it will
678 /// return a readiness event for the handle the next time [`poll`] is
679 /// called.
680 ///
681 /// See the [`struct`] docs for a high level overview.
682 ///
683 /// # Arguments
684 ///
685 /// `handle: &E: Evented`: This is the handle that the `Poll` instance
686 /// should monitor for readiness state changes.
687 ///
688 /// `token: Token`: The caller picks a token to associate with the socket.
689 /// When [`poll`] returns an event for the handle, this token is included.
690 /// This allows the caller to map the event to its handle. The token
691 /// associated with the `Evented` handle can be changed at any time by
692 /// calling [`reregister`].
693 ///
694 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
695 /// usage.
696 ///
697 /// See documentation on [`Token`] for an example showing how to pick
698 /// [`Token`] values.
699 ///
700 /// `interest: Ready`: Specifies which operations `Poll` should monitor for
701 /// readiness. `Poll` will only return readiness events for operations
702 /// specified by this argument.
703 ///
704 /// If a socket is registered with readable interest and the socket becomes
705 /// writable, no event will be returned from [`poll`].
706 ///
707 /// The readiness interest for an `Evented` handle can be changed at any
708 /// time by calling [`reregister`].
709 ///
710 /// `opts: PollOpt`: Specifies the registration options. The most common
711 /// options being [`level`] for level-triggered events, [`edge`] for
712 /// edge-triggered events, and [`oneshot`].
713 ///
714 /// The registration options for an `Evented` handle can be changed at any
715 /// time by calling [`reregister`].
716 ///
717 /// # Notes
718 ///
719 /// Unless otherwise specified, the caller should assume that once an
720 /// `Evented` handle is registered with a `Poll` instance, it is bound to
721 /// that `Poll` instance for the lifetime of the `Evented` handle. This
722 /// remains true even if the `Evented` handle is deregistered from the poll
723 /// instance using [`deregister`].
724 ///
725 /// This function is **thread safe**. It can be called concurrently from
726 /// multiple threads.
727 ///
728 /// [`struct`]: #
729 /// [`reregister`]: #method.reregister
730 /// [`deregister`]: #method.deregister
731 /// [`poll`]: #method.poll
732 /// [`level`]: struct.PollOpt.html#method.level
733 /// [`edge`]: struct.PollOpt.html#method.edge
734 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
735 /// [`Token`]: struct.Token.html
736 ///
737 // # Examples
738 //
739 // ```
740 // # use std::error::Error;
741 // # fn try_main() -> Result<(), Box<dyn Error>> {
742 // use corcovado::{Events, Poll, Ready, PollOpt, Token};
743 // use std::net::TcpStream;
744 // use std::time::{Duration, Instant};
745 //
746 // let poll = Poll::new()?;
747 // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748 //
749 // // Register the socket with `poll`
750 // poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751 //
752 // let mut events = Events::with_capacity(1024);
753 // let start = Instant::now();
754 // let timeout = Duration::from_millis(500);
755 //
756 // loop {
757 // let elapsed = start.elapsed();
758 //
759 // if elapsed >= timeout {
760 // // Connection timed out
761 // return Ok(());
762 // }
763 //
764 // let remaining = timeout - elapsed;
765 // poll.poll(&mut events, Some(remaining))?;
766 //
767 // for event in &events {
768 // if event.token() == Token(0) {
769 // // Something (probably) happened on the socket.
770 // return Ok(());
771 // }
772 // }
773 // }
774 // # Ok(())
775 // # }
776 // #
777 // # fn main() {
778 // # try_main().unwrap();
779 // # }
780 // ```
781 pub fn register<E>(
782 &self,
783 handle: &E,
784 token: Token,
785 interest: Ready,
786 opts: PollOpt,
787 ) -> io::Result<()>
788 where
789 E: Evented + ?Sized,
790 {
791 validate_args(token)?;
792
793 /*
794 * Undefined behavior:
795 * - Reusing a token with a different `Evented` without deregistering
796 * (or closing) the original `Evented`.
797 */
798 trace!("registering with poller");
799
800 // Register interests for this socket
801 handle.register(self, token, interest, opts)?;
802
803 Ok(())
804 }
805
806 /// Re-register an `Evented` handle with the `Poll` instance.
807 ///
808 /// Re-registering an `Evented` handle allows changing the details of the
809 /// registration. Specifically, it allows updating the associated `token`,
810 /// `interest`, and `opts` specified in previous `register` and `reregister`
811 /// calls.
812 ///
813 /// The `reregister` arguments fully override the previous values. In other
814 /// words, if a socket is registered with [`readable`] interest and the call
815 /// to `reregister` specifies [`writable`], then read interest is no longer
816 /// requested for the handle.
817 ///
818 /// The `Evented` handle must have previously been registered with this
819 /// instance of `Poll` otherwise the call to `reregister` will return with
820 /// an error.
821 ///
822 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823 /// usage.
824 ///
825 /// See the [`register`] documentation for details about the function
826 /// arguments and see the [`struct`] docs for a high level overview of
827 /// polling.
828 ///
829 // # Examples
830 //
831 // ```
832 // # use std::error::Error;
833 // # fn try_main() -> Result<(), Box<dyn Error>> {
834 // use corcovado::{Poll, Ready, PollOpt, Token};
835 // use std::net::TcpStream;
836 //
837 // let poll = Poll::new()?;
838 // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
839 //
840 // // Register the socket with `poll`, requesting readable
841 // poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
842 //
843 // // Reregister the socket specifying a different token and write interest
844 // // instead. `PollOpt::edge()` must be specified even though that value
845 // // is not being changed.
846 // poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
847 // # Ok(())
848 // # }
849 // #
850 // # fn main() {
851 // # try_main().unwrap();
852 // # }
853 // ```
854 //
855 // [`struct`]: #
856 // [`register`]: #method.register
857 // [`readable`]: struct.Ready.html#method.readable
858 // [`writable`]: struct.Ready.html#method.writable
859 pub fn reregister<E>(
860 &self,
861 handle: &E,
862 token: Token,
863 interest: Ready,
864 opts: PollOpt,
865 ) -> io::Result<()>
866 where
867 E: Evented + ?Sized,
868 {
869 validate_args(token)?;
870
871 trace!("registering with poller");
872
873 // Register interests for this socket
874 handle.reregister(self, token, interest, opts)?;
875
876 Ok(())
877 }
878
879 /// Deregister an `Evented` handle with the `Poll` instance.
880 ///
881 /// When an `Evented` handle is deregistered, the `Poll` instance will
882 /// no longer monitor it for readiness state changes. Unlike disabling
883 /// handles with oneshot, deregistering clears up any internal resources
884 /// needed to track the handle.
885 ///
886 /// A handle can be passed back to `register` after it has been
887 /// deregistered; however, it must be passed back to the **same** `Poll`
888 /// instance.
889 ///
890 /// `Evented` handles are automatically deregistered when they are dropped.
891 /// It is common to never need to explicitly call `deregister`.
892 ///
893 // # Examples
894 //
895 // ```
896 // # use std::error::Error;
897 // # fn try_main() -> Result<(), Box<dyn Error>> {
898 // use corcovado::{Events, Poll, Ready, PollOpt, Token};
899 // use std::net::TcpStream;
900 // use std::time::Duration;
901 //
902 // let poll = Poll::new()?;
903 // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
904 //
905 // // Register the socket with `poll`
906 // poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
907 //
908 // poll.deregister(&socket)?;
909 //
910 // let mut events = Events::with_capacity(1024);
911 //
912 // // Set a timeout because this poll should never receive any events.
913 // let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
914 // assert_eq!(0, n);
915 // # Ok(())
916 // # }
917 // #
918 // # fn main() {
919 // # try_main().unwrap();
920 // # }
921 // ```
922 pub fn deregister<E>(&self, handle: &E) -> io::Result<()>
923 where
924 E: Evented + ?Sized,
925 {
926 trace!("deregistering handle with poller");
927
928 // Deregister interests for this socket
929 handle.deregister(self)?;
930
931 Ok(())
932 }
933
934 /// Wait for readiness events
935 ///
936 /// Blocks the current thread and waits for readiness events for any of the
937 /// `Evented` handles that have been registered with this `Poll` instance.
938 /// The function will block until either at least one readiness event has
939 /// been received or `timeout` has elapsed. A `timeout` of `None` means that
940 /// `poll` will block until a readiness event has been received.
941 ///
942 /// The supplied `events` will be cleared and newly received readiness events
943 /// will be pushed onto the end. At most `events.capacity()` events will be
944 /// returned. If there are further pending readiness events, they will be
945 /// returned on the next call to `poll`.
946 ///
947 /// A single call to `poll` may result in multiple readiness events being
948 /// returned for a single `Evented` handle. For example, if a TCP socket
949 /// becomes both readable and writable, it may be possible for a single
950 /// readiness event to be returned with both [`readable`] and [`writable`]
951 /// readiness **OR** two separate events may be returned, one with
952 /// [`readable`] set and one with [`writable`] set.
953 ///
954 /// Note that the `timeout` will be rounded up to the system clock
955 /// granularity (usually 1ms), and kernel scheduling delays mean that
956 /// the blocking interval may be overrun by a small amount.
957 ///
958 /// `poll` returns the number of readiness events that have been pushed into
959 /// `events` or `Err` when an error has been encountered with the system
960 /// selector. The value returned is deprecated and will be removed in 0.7.0.
961 /// Accessing the events by index is also deprecated. Events can be
962 /// inserted by other events triggering, thus making sequential access
963 /// problematic. Use the iterator API instead. See [`iter`].
964 ///
965 /// See the [struct] level documentation for a higher level discussion of
966 /// polling.
967 ///
968 /// [`readable`]: struct.Ready.html#method.readable
969 /// [`writable`]: struct.Ready.html#method.writable
970 /// [struct]: #
971 /// [`iter`]: struct.Events.html#method.iter
972 ///
973 // # Examples
974 //
975 // A basic example -- establishing a `TcpStream` connection.
976 //
977 // ```
978 // # use std::error::Error;
979 // # fn try_main() -> Result<(), Box<dyn Error>> {
980 // use corcovado::{Events, Poll, Ready, PollOpt, Token};
981 //
982 // use std::net::{TcpStream, TcpListener, SocketAddr};
983 // use std::thread;
984 //
985 // // Bind a server socket to connect to.
986 // let addr: SocketAddr = "127.0.0.1:0".parse()?;
987 // let server = TcpListener::bind(&addr)?;
988 // let addr = server.local_addr()?.clone();
989 //
990 // // Spawn a thread to accept the socket
991 // thread::spawn(move || {
992 // let _ = server.accept();
993 // });
994 //
995 // // Construct a new `Poll` handle as well as the `Events` we'll store into
996 // let poll = Poll::new()?;
997 // let mut events = Events::with_capacity(1024);
998 //
999 // // Connect the stream
1000 // let stream = TcpStream::connect(&addr)?;
1001 //
1002 // // Register the stream with `Poll`
1003 // poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1004 //
1005 // // Wait for the socket to become ready. This has to happens in a loop to
1006 // // handle spurious wakeups.
1007 // loop {
1008 // poll.poll(&mut events, None)?;
1009 //
1010 // for event in &events {
1011 // if event.token() == Token(0) && event.readiness().is_writable() {
1012 // // The socket connected (probably, it could still be a spurious
1013 // // wakeup)
1014 // return Ok(());
1015 // }
1016 // }
1017 // }
1018 // # Ok(())
1019 // # }
1020 // #
1021 // # fn main() {
1022 // # try_main().unwrap();
1023 // # }
1024 // ```
1025 ///
1026 /// [struct]: #
1027 pub fn poll(
1028 &self,
1029 events: &mut Events,
1030 timeout: Option<Duration>,
1031 ) -> io::Result<usize> {
1032 self.poll1(events, timeout, false)
1033 }
1034
1035 /// Like `poll`, but may be interrupted by a signal
1036 ///
1037 /// If `poll` is inturrupted while blocking, it will transparently retry the syscall. If you
1038 /// want to handle signals yourself, however, use `poll_interruptible`.
1039 pub fn poll_interruptible(
1040 &self,
1041 events: &mut Events,
1042 timeout: Option<Duration>,
1043 ) -> io::Result<usize> {
1044 self.poll1(events, timeout, true)
1045 }
1046
1047 fn poll1(
1048 &self,
1049 events: &mut Events,
1050 mut timeout: Option<Duration>,
1051 interruptible: bool,
1052 ) -> io::Result<usize> {
1053 let zero = Some(Duration::from_millis(0));
1054
1055 // At a high level, the synchronization strategy is to acquire access to
1056 // the critical section by transitioning the atomic from unlocked ->
1057 // locked. If the attempt fails, the thread will wait on the condition
1058 // variable.
1059 //
1060 // # Some more detail
1061 //
1062 // The `lock_state` atomic usize combines:
1063 //
1064 // - locked flag, stored in the least significant bit
1065 // - number of waiting threads, stored in the rest of the bits.
1066 //
1067 // When a thread transitions the locked flag from 0 -> 1, it has
1068 // obtained access to the critical section.
1069 //
1070 // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1071 // This is a fast path for the case when there are no concurrent calls
1072 // to poll, which is very common.
1073 //
1074 // On failure, the mutex is locked, and the thread attempts to increment
1075 // the number of waiting threads component of `lock_state`. If this is
1076 // successfully done while the locked flag is set, then the thread can
1077 // wait on the condition variable.
1078 //
1079 // When a thread exits the critical section, it unsets the locked flag.
1080 // If there are any waiters, which is atomically determined while
1081 // unsetting the locked flag, then the condvar is notified.
1082
1083 #[allow(deprecated)]
1084 let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1085
1086 if 0 != curr {
1087 // Enter slower path
1088 let mut lock = self.lock.lock().unwrap();
1089 let mut inc = false;
1090
1091 loop {
1092 if curr & 1 == 0 {
1093 // The lock is currently free, attempt to grab it
1094 let mut next = curr | 1;
1095
1096 if inc {
1097 // The waiter count has previously been incremented, so
1098 // decrement it here
1099 next -= 2;
1100 }
1101
1102 #[allow(deprecated)]
1103 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1104
1105 if actual != curr {
1106 curr = actual;
1107 continue;
1108 }
1109
1110 // Lock acquired, break from the loop
1111 break;
1112 }
1113
1114 if timeout == zero {
1115 if inc {
1116 self.lock_state.fetch_sub(2, SeqCst);
1117 }
1118
1119 return Ok(0);
1120 }
1121
1122 // The lock is currently held, so wait for it to become
1123 // free. If the waiter count hasn't been incremented yet, do
1124 // so now
1125 if !inc {
1126 let next = curr.checked_add(2).expect("overflow");
1127 #[allow(deprecated)]
1128 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1129
1130 if actual != curr {
1131 curr = actual;
1132 continue;
1133 }
1134
1135 // Track that the waiter count has been incremented for
1136 // this thread and fall through to the condvar waiting
1137 inc = true;
1138 }
1139
1140 lock = match timeout {
1141 Some(to) => {
1142 let now = Instant::now();
1143
1144 // Wait to be notified
1145 let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1146
1147 // See how much time was elapsed in the wait
1148 let elapsed = now.elapsed();
1149
1150 // Update `timeout` to reflect how much time is left to
1151 // wait.
1152 if elapsed >= to {
1153 timeout = zero;
1154 } else {
1155 // Update the timeout
1156 timeout = Some(to - elapsed);
1157 }
1158
1159 l
1160 }
1161 None => self.condvar.wait(lock).unwrap(),
1162 };
1163
1164 // Reload the state
1165 curr = self.lock_state.load(SeqCst);
1166
1167 // Try to lock again...
1168 }
1169 }
1170
1171 let ret = self.poll2(events, timeout, interruptible);
1172
1173 // Release the lock
1174 if 1 != self.lock_state.fetch_and(!1, Release) {
1175 // Acquire the mutex
1176 let _lock = self.lock.lock().unwrap();
1177
1178 // There is at least one waiting thread, so notify one
1179 self.condvar.notify_one();
1180 }
1181
1182 ret
1183 }
1184
1185 #[inline]
1186 #[allow(clippy::if_same_then_else)]
1187 fn poll2(
1188 &self,
1189 events: &mut Events,
1190 mut timeout: Option<Duration>,
1191 interruptible: bool,
1192 ) -> io::Result<usize> {
1193 // Compute the timeout value passed to the system selector. If the
1194 // readiness queue has pending nodes, we still want to poll the system
1195 // selector for new events, but we don't want to block the thread to
1196 // wait for new events.
1197 if timeout == Some(Duration::from_millis(0)) {
1198 // If blocking is not requested, then there is no need to prepare
1199 // the queue for sleep
1200 //
1201 // The sleep_marker should be removed by readiness_queue.poll().
1202 } else if self.readiness_queue.prepare_for_sleep() {
1203 // The readiness queue is empty. The call to `prepare_for_sleep`
1204 // inserts `sleep_marker` into the queue. This signals to any
1205 // threads setting readiness that the `Poll::poll` is going to
1206 // sleep, so the awakener should be used.
1207 } else {
1208 // The readiness queue is not empty, so do not block the thread.
1209 timeout = Some(Duration::from_millis(0));
1210 }
1211
1212 loop {
1213 let now = Instant::now();
1214 // First get selector events
1215 let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1216 match res {
1217 Ok(true) => {
1218 // Some awakeners require reading from a FD.
1219 self.readiness_queue.inner.awakener.cleanup();
1220 break;
1221 }
1222 Ok(false) => break,
1223 Err(ref e)
1224 if e.kind() == io::ErrorKind::Interrupted && !interruptible =>
1225 {
1226 // Interrupted by a signal; update timeout if necessary and retry
1227 if let Some(to) = timeout {
1228 let elapsed = now.elapsed();
1229 if elapsed >= to {
1230 break;
1231 } else {
1232 timeout = Some(to - elapsed);
1233 }
1234 }
1235 }
1236 Err(e) => return Err(e),
1237 }
1238 }
1239
1240 // Poll custom event queue
1241 self.readiness_queue.poll(&mut events.inner);
1242
1243 // Return number of polled events
1244 Ok(events.inner.len())
1245 }
1246}
1247
1248fn validate_args(token: Token) -> io::Result<()> {
1249 if token == AWAKEN {
1250 return Err(io::Error::other("invalid token"));
1251 }
1252
1253 Ok(())
1254}
1255
1256impl fmt::Debug for Poll {
1257 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1258 fmt.debug_struct("Poll").finish()
1259 }
1260}
1261
1262#[cfg(all(unix, not(target_os = "fuchsia")))]
1263impl AsRawFd for Poll {
1264 fn as_raw_fd(&self) -> RawFd {
1265 self.selector.as_raw_fd()
1266 }
1267}
1268
1269/// A collection of readiness events.
1270///
1271/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1272/// receive any new readiness events received since the last poll. Usually, a
1273/// single `Events` instance is created at the same time as a [`Poll`] and
1274/// reused on each call to [`Poll::poll`].
1275///
1276/// See [`Poll`] for more documentation on polling.
1277///
1278/// # Examples
1279///
1280/// ```
1281/// # use std::error::Error;
1282/// # fn try_main() -> Result<(), Box<dyn Error>> {
1283/// use corcovado::{Events, Poll};
1284/// use std::time::Duration;
1285///
1286/// let mut events = Events::with_capacity(1024);
1287/// let poll = Poll::new()?;
1288///
1289/// assert_eq!(0, events.len());
1290///
1291/// // Register `Evented` handles with `poll`
1292///
1293/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1294///
1295/// for event in &events {
1296/// println!("event={:?}", event);
1297/// }
1298/// # Ok(())
1299/// # }
1300/// #
1301/// # fn main() {
1302/// # try_main().unwrap();
1303/// # }
1304/// ```
1305///
1306/// [`Poll::poll`]: struct.Poll.html#method.poll
1307/// [`Poll`]: struct.Poll.html
1308pub struct Events {
1309 inner: sys::Events,
1310}
1311
1312/// [`Events`] iterator.
1313///
1314/// This struct is created by the [`iter`] method on [`Events`].
1315///
1316/// # Examples
1317///
1318/// ```
1319/// # use std::error::Error;
1320/// # fn try_main() -> Result<(), Box<dyn Error>> {
1321/// use corcovado::{Events, Poll};
1322/// use std::time::Duration;
1323///
1324/// let mut events = Events::with_capacity(1024);
1325/// let poll = Poll::new()?;
1326///
1327/// // Register handles with `poll`
1328///
1329/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1330///
1331/// for event in events.iter() {
1332/// println!("event={:?}", event);
1333/// }
1334/// # Ok(())
1335/// # }
1336/// #
1337/// # fn main() {
1338/// # try_main().unwrap();
1339/// # }
1340/// ```
1341///
1342/// [`Events`]: struct.Events.html
1343/// [`iter`]: struct.Events.html#method.iter
1344#[derive(Debug, Clone)]
1345pub struct Iter<'a> {
1346 inner: &'a Events,
1347 pos: usize,
1348}
1349
1350/// Owned [`Events`] iterator.
1351///
1352/// This struct is created by the `into_iter` method on [`Events`].
1353///
1354/// # Examples
1355///
1356/// ```
1357/// # use std::error::Error;
1358/// # fn try_main() -> Result<(), Box<dyn Error>> {
1359/// use corcovado::{Events, Poll};
1360/// use std::time::Duration;
1361///
1362/// let mut events = Events::with_capacity(1024);
1363/// let poll = Poll::new()?;
1364///
1365/// // Register handles with `poll`
1366///
1367/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1368///
1369/// for event in events {
1370/// println!("event={:?}", event);
1371/// }
1372/// # Ok(())
1373/// # }
1374/// #
1375/// # fn main() {
1376/// # try_main().unwrap();
1377/// # }
1378/// ```
1379/// [`Events`]: struct.Events.html
1380#[derive(Debug)]
1381pub struct IntoIter {
1382 inner: Events,
1383 pos: usize,
1384}
1385
1386impl Events {
1387 /// Return a new `Events` capable of holding up to `capacity` events.
1388 ///
1389 /// # Examples
1390 ///
1391 /// ```
1392 /// use corcovado::Events;
1393 ///
1394 /// let events = Events::with_capacity(1024);
1395 ///
1396 /// assert_eq!(1024, events.capacity());
1397 /// ```
1398 pub fn with_capacity(capacity: usize) -> Events {
1399 Events {
1400 inner: sys::Events::with_capacity(capacity),
1401 }
1402 }
1403
1404 pub fn get(&self, idx: usize) -> Option<Event> {
1405 self.inner.get(idx)
1406 }
1407
1408 pub fn len(&self) -> usize {
1409 self.inner.len()
1410 }
1411
1412 /// Returns the number of `Event` values that `self` can hold.
1413 ///
1414 /// ```
1415 /// use corcovado::Events;
1416 ///
1417 /// let events = Events::with_capacity(1024);
1418 ///
1419 /// assert_eq!(1024, events.capacity());
1420 /// ```
1421 pub fn capacity(&self) -> usize {
1422 self.inner.capacity()
1423 }
1424
1425 /// Returns `true` if `self` contains no `Event` values.
1426 ///
1427 /// # Examples
1428 ///
1429 /// ```
1430 /// use corcovado::Events;
1431 ///
1432 /// let events = Events::with_capacity(1024);
1433 ///
1434 /// assert!(events.is_empty());
1435 /// ```
1436 pub fn is_empty(&self) -> bool {
1437 self.inner.is_empty()
1438 }
1439
1440 /// Returns an iterator over the `Event` values.
1441 ///
1442 /// # Examples
1443 ///
1444 /// ```
1445 /// # use std::error::Error;
1446 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1447 /// use corcovado::{Events, Poll};
1448 /// use std::time::Duration;
1449 ///
1450 /// let mut events = Events::with_capacity(1024);
1451 /// let poll = Poll::new()?;
1452 ///
1453 /// // Register handles with `poll`
1454 ///
1455 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1456 ///
1457 /// for event in events.iter() {
1458 /// println!("event={:?}", event);
1459 /// }
1460 /// # Ok(())
1461 /// # }
1462 /// #
1463 /// # fn main() {
1464 /// # try_main().unwrap();
1465 /// # }
1466 /// ```
1467 pub fn iter(&self) -> Iter<'_> {
1468 Iter {
1469 inner: self,
1470 pos: 0,
1471 }
1472 }
1473
1474 /// Clearing all `Event` values from container explicitly.
1475 ///
1476 /// # Examples
1477 ///
1478 /// ```
1479 /// # use std::error::Error;
1480 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1481 /// use corcovado::{Events, Poll};
1482 /// use std::time::Duration;
1483 ///
1484 /// let mut events = Events::with_capacity(1024);
1485 /// let poll = Poll::new()?;
1486 ///
1487 /// // Register handles with `poll`
1488 /// for _ in 0..2 {
1489 /// events.clear();
1490 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1491 ///
1492 /// for event in events.iter() {
1493 /// println!("event={:?}", event);
1494 /// }
1495 /// }
1496 /// # Ok(())
1497 /// # }
1498 /// #
1499 /// # fn main() {
1500 /// # try_main().unwrap();
1501 /// # }
1502 /// ```
1503 pub fn clear(&mut self) {
1504 self.inner.clear();
1505 }
1506}
1507
1508impl<'a> IntoIterator for &'a Events {
1509 type Item = Event;
1510 type IntoIter = Iter<'a>;
1511
1512 fn into_iter(self) -> Self::IntoIter {
1513 self.iter()
1514 }
1515}
1516
1517impl Iterator for Iter<'_> {
1518 type Item = Event;
1519
1520 fn next(&mut self) -> Option<Event> {
1521 let ret = self.inner.inner.get(self.pos);
1522 self.pos += 1;
1523 ret
1524 }
1525}
1526
1527impl IntoIterator for Events {
1528 type Item = Event;
1529 type IntoIter = IntoIter;
1530
1531 fn into_iter(self) -> Self::IntoIter {
1532 IntoIter {
1533 inner: self,
1534 pos: 0,
1535 }
1536 }
1537}
1538
1539impl Iterator for IntoIter {
1540 type Item = Event;
1541
1542 fn next(&mut self) -> Option<Event> {
1543 let ret = self.inner.inner.get(self.pos);
1544 self.pos += 1;
1545 ret
1546 }
1547}
1548
1549impl fmt::Debug for Events {
1550 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1551 f.debug_struct("Events")
1552 .field("capacity", &self.capacity())
1553 .finish()
1554 }
1555}
1556
1557// ===== Accessors for internal usage =====
1558
1559pub fn selector(poll: &Poll) -> &sys::Selector {
1560 &poll.selector
1561}
1562
1563/*
1564 *
1565 * ===== Registration =====
1566 *
1567 */
1568
1569// TODO: get rid of this, windows depends on it for now
1570#[allow(dead_code)]
1571pub fn new_registration(
1572 poll: &Poll,
1573 token: Token,
1574 ready: Ready,
1575 opt: PollOpt,
1576) -> (Registration, SetReadiness) {
1577 Registration::new_priv(poll, token, ready, opt)
1578}
1579
1580impl Registration {
1581 /// Create and return a new `Registration` and the associated
1582 /// `SetReadiness`.
1583 ///
1584 /// See [struct] documentation for more detail and [`Poll`]
1585 /// for high level documentation on polling.
1586 ///
1587 /// # Examples
1588 ///
1589 /// ```
1590 /// # use std::error::Error;
1591 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1592 /// use corcovado::{Events, Ready, Registration, Poll, PollOpt, Token};
1593 /// use std::thread;
1594 ///
1595 /// let (registration, set_readiness) = Registration::new2();
1596 ///
1597 /// thread::spawn(move || {
1598 /// use std::time::Duration;
1599 /// thread::sleep(Duration::from_millis(500));
1600 ///
1601 /// set_readiness.set_readiness(Ready::readable());
1602 /// });
1603 ///
1604 /// let poll = Poll::new()?;
1605 /// poll.register(®istration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1606 ///
1607 /// let mut events = Events::with_capacity(256);
1608 ///
1609 /// loop {
1610 /// poll.poll(&mut events, None);
1611 ///
1612 /// for event in &events {
1613 /// if event.token() == Token(0) && event.readiness().is_readable() {
1614 /// return Ok(());
1615 /// }
1616 /// }
1617 /// }
1618 /// # Ok(())
1619 /// # }
1620 /// #
1621 /// # fn main() {
1622 /// # try_main().unwrap();
1623 /// # }
1624 /// ```
1625 /// [struct]: #
1626 /// [`Poll`]: struct.Poll.html
1627 pub fn new2() -> (Registration, SetReadiness) {
1628 // Allocate the registration node. The new node will have `ref_count`
1629 // set to 2: one SetReadiness, one Registration.
1630 let node = Box::into_raw(Box::new(ReadinessNode::new(
1631 ptr::null_mut(),
1632 Token(0),
1633 Ready::empty(),
1634 PollOpt::empty(),
1635 2,
1636 )));
1637
1638 let registration = Registration {
1639 inner: RegistrationInner { node },
1640 };
1641
1642 let set_readiness = SetReadiness {
1643 inner: RegistrationInner { node },
1644 };
1645
1646 (registration, set_readiness)
1647 }
1648
1649 pub fn new(
1650 poll: &Poll,
1651 token: Token,
1652 interest: Ready,
1653 opt: PollOpt,
1654 ) -> (Registration, SetReadiness) {
1655 Registration::new_priv(poll, token, interest, opt)
1656 }
1657
1658 // TODO: Get rid of this (windows depends on it for now)
1659 fn new_priv(
1660 poll: &Poll,
1661 token: Token,
1662 interest: Ready,
1663 opt: PollOpt,
1664 ) -> (Registration, SetReadiness) {
1665 is_send::<Registration>();
1666 is_sync::<Registration>();
1667 is_send::<SetReadiness>();
1668 is_sync::<SetReadiness>();
1669
1670 // Clone handle to the readiness queue, this bumps the ref count
1671 let queue = poll.readiness_queue.inner.clone();
1672
1673 // Convert to a *mut () pointer
1674 let queue: *mut () = unsafe { mem::transmute(queue) };
1675
1676 // Allocate the registration node. The new node will have `ref_count`
1677 // set to 3: one SetReadiness, one Registration, and one Poll handle.
1678 let node =
1679 Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1680
1681 let registration = Registration {
1682 inner: RegistrationInner { node },
1683 };
1684
1685 let set_readiness = SetReadiness {
1686 inner: RegistrationInner { node },
1687 };
1688
1689 (registration, set_readiness)
1690 }
1691
1692 pub fn update(
1693 &self,
1694 poll: &Poll,
1695 token: Token,
1696 interest: Ready,
1697 opts: PollOpt,
1698 ) -> io::Result<()> {
1699 self.inner.update(poll, token, interest, opts)
1700 }
1701
1702 pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1703 self.inner
1704 .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1705 }
1706}
1707
1708impl Evented for Registration {
1709 fn register(
1710 &self,
1711 poll: &Poll,
1712 token: Token,
1713 interest: Ready,
1714 opts: PollOpt,
1715 ) -> io::Result<()> {
1716 self.inner.update(poll, token, interest, opts)
1717 }
1718
1719 fn reregister(
1720 &self,
1721 poll: &Poll,
1722 token: Token,
1723 interest: Ready,
1724 opts: PollOpt,
1725 ) -> io::Result<()> {
1726 self.inner.update(poll, token, interest, opts)
1727 }
1728
1729 fn deregister(&self, poll: &Poll) -> io::Result<()> {
1730 self.inner
1731 .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1732 }
1733}
1734
1735impl Drop for Registration {
1736 fn drop(&mut self) {
1737 // `flag_as_dropped` toggles the `dropped` flag and notifies
1738 // `Poll::poll` to release its handle (which is just decrementing
1739 // the ref count).
1740 if self.inner.state.flag_as_dropped() {
1741 // Can't do anything if the queuing fails
1742 let _ = self.inner.enqueue_with_wakeup();
1743 }
1744 }
1745}
1746
1747impl fmt::Debug for Registration {
1748 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1749 fmt.debug_struct("Registration").finish()
1750 }
1751}
1752
1753impl SetReadiness {
1754 /// Returns the registration's current readiness.
1755 ///
1756 /// # Note
1757 ///
1758 /// There is no guarantee that `readiness` establishes any sort of memory
1759 /// ordering. Any concurrent data access must be synchronized using another
1760 /// strategy.
1761 ///
1762 /// # Examples
1763 ///
1764 /// ```
1765 /// # use std::error::Error;
1766 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1767 /// use corcovado::{Registration, Ready};
1768 ///
1769 /// let (registration, set_readiness) = Registration::new2();
1770 ///
1771 /// assert!(set_readiness.readiness().is_empty());
1772 ///
1773 /// set_readiness.set_readiness(Ready::readable())?;
1774 /// assert!(set_readiness.readiness().is_readable());
1775 /// # Ok(())
1776 /// # }
1777 /// #
1778 /// # fn main() {
1779 /// # try_main().unwrap();
1780 /// # }
1781 /// ```
1782 pub fn readiness(&self) -> Ready {
1783 self.inner.readiness()
1784 }
1785
1786 /// Set the registration's readiness
1787 ///
1788 /// If the associated `Registration` is registered with a [`Poll`] instance
1789 /// and has requested readiness events that include `ready`, then a future
1790 /// call to [`Poll::poll`] will receive a readiness event representing the
1791 /// readiness state change.
1792 ///
1793 /// # Note
1794 ///
1795 /// There is no guarantee that `readiness` establishes any sort of memory
1796 /// ordering. Any concurrent data access must be synchronized using another
1797 /// strategy.
1798 ///
1799 /// There is also no guarantee as to when the readiness event will be
1800 /// delivered to poll. A best attempt will be made to make the delivery in a
1801 /// "timely" fashion. For example, the following is **not** guaranteed to
1802 /// work:
1803 ///
1804 /// ```
1805 /// # use std::error::Error;
1806 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1807 /// use corcovado::{Events, Registration, Ready, Poll, PollOpt, Token};
1808 ///
1809 /// let poll = Poll::new()?;
1810 /// let (registration, set_readiness) = Registration::new2();
1811 ///
1812 /// poll.register(®istration,
1813 /// Token(0),
1814 /// Ready::readable(),
1815 /// PollOpt::edge())?;
1816 ///
1817 /// // Set the readiness, then immediately poll to try to get the readiness
1818 /// // event
1819 /// set_readiness.set_readiness(Ready::readable())?;
1820 ///
1821 /// let mut events = Events::with_capacity(1024);
1822 /// poll.poll(&mut events, None)?;
1823 ///
1824 /// // There is NO guarantee that the following will work. It is possible
1825 /// // that the readiness event will be delivered at a later time.
1826 /// let event = events.get(0).unwrap();
1827 /// assert_eq!(event.token(), Token(0));
1828 /// assert!(event.readiness().is_readable());
1829 /// # Ok(())
1830 /// # }
1831 /// #
1832 /// # fn main() {
1833 /// # try_main().unwrap();
1834 /// # }
1835 /// ```
1836 ///
1837 /// # Examples
1838 ///
1839 /// A simple example, for a more elaborate example, see the [`Evented`]
1840 /// documentation.
1841 ///
1842 /// ```
1843 /// # use std::error::Error;
1844 /// # fn try_main() -> Result<(), Box<dyn Error>> {
1845 /// use corcovado::{Registration, Ready};
1846 ///
1847 /// let (registration, set_readiness) = Registration::new2();
1848 ///
1849 /// assert!(set_readiness.readiness().is_empty());
1850 ///
1851 /// set_readiness.set_readiness(Ready::readable())?;
1852 /// assert!(set_readiness.readiness().is_readable());
1853 /// # Ok(())
1854 /// # }
1855 /// #
1856 /// # fn main() {
1857 /// # try_main().unwrap();
1858 /// # }
1859 /// ```
1860 ///
1861 /// [`Registration`]: struct.Registration.html
1862 /// [`Evented`]: event/trait.Evented.html#examples
1863 /// [`Poll`]: struct.Poll.html
1864 /// [`Poll::poll`]: struct.Poll.html#method.poll
1865 pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1866 self.inner.set_readiness(ready)
1867 }
1868}
1869
1870impl fmt::Debug for SetReadiness {
1871 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1872 f.debug_struct("SetReadiness").finish()
1873 }
1874}
1875
1876impl RegistrationInner {
1877 /// Get the registration's readiness.
1878 fn readiness(&self) -> Ready {
1879 self.state.load(Relaxed).readiness()
1880 }
1881
1882 /// Set the registration's readiness.
1883 ///
1884 /// This function can be called concurrently by an arbitrary number of
1885 /// SetReadiness handles.
1886 fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1887 // Load the current atomic state.
1888 let mut state = self.state.load(Acquire);
1889 let mut next;
1890
1891 loop {
1892 next = state;
1893
1894 if state.is_dropped() {
1895 // Node is dropped, no more notifications
1896 return Ok(());
1897 }
1898
1899 // Update the readiness
1900 next.set_readiness(ready);
1901
1902 // If the readiness is not blank, try to obtain permission to
1903 // push the node into the readiness queue.
1904 if !next.effective_readiness().is_empty() {
1905 next.set_queued();
1906 }
1907
1908 let actual = self.state.compare_and_swap(state, next, AcqRel);
1909
1910 if state == actual {
1911 break;
1912 }
1913
1914 state = actual;
1915 }
1916
1917 if !state.is_queued() && next.is_queued() {
1918 // We toggled the queued flag, making us responsible for queuing the
1919 // node in the MPSC readiness queue.
1920 self.enqueue_with_wakeup()?;
1921 }
1922
1923 Ok(())
1924 }
1925
1926 /// Update the registration details associated with the node
1927 fn update(
1928 &self,
1929 poll: &Poll,
1930 token: Token,
1931 interest: Ready,
1932 opt: PollOpt,
1933 ) -> io::Result<()> {
1934 // First, ensure poll instances match
1935 //
1936 // Load the queue pointer, `Relaxed` is sufficient here as only the
1937 // pointer is being operated on. The actual memory is guaranteed to be
1938 // visible the `poll: &Poll` ref passed as an argument to the function.
1939 let mut queue = self.readiness_queue.load(Relaxed);
1940 let other: &*mut () =
1941 unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1942 let other = *other;
1943
1944 debug_assert!(
1945 mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>()
1946 );
1947
1948 if queue.is_null() {
1949 // Attempt to set the queue pointer. `Release` ordering synchronizes
1950 // with `Acquire` in `ensure_with_wakeup`.
1951 #[allow(deprecated)]
1952 let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1953
1954 if actual.is_null() {
1955 // The CAS succeeded, this means that the node's ref count
1956 // should be incremented to reflect that the `poll` function
1957 // effectively owns the node as well.
1958 //
1959 // `Relaxed` ordering used for the same reason as in
1960 // RegistrationInner::clone
1961 self.ref_count.fetch_add(1, Relaxed);
1962
1963 // Note that the `queue` reference stored in our
1964 // `readiness_queue` field is intended to be a strong reference,
1965 // so now that we've successfully claimed the reference we bump
1966 // the refcount here.
1967 //
1968 // Down below in `release_node` when we deallocate this
1969 // `RegistrationInner` is where we'll transmute this back to an
1970 // arc and decrement the reference count.
1971 mem::forget(poll.readiness_queue.clone());
1972 } else {
1973 // The CAS failed, another thread set the queue pointer, so ensure
1974 // that the pointer and `other` match
1975 if actual != other {
1976 return Err(io::Error::other(
1977 "registration handle associated with another `Poll` instance",
1978 ));
1979 }
1980 }
1981
1982 queue = other;
1983 } else if queue != other {
1984 return Err(io::Error::other(
1985 "registration handle associated with another `Poll` instance",
1986 ));
1987 }
1988
1989 unsafe {
1990 let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1991 debug_assert_eq!(queue as usize, *actual);
1992 }
1993
1994 // The `update_lock` atomic is used as a flag ensuring only a single
1995 // thread concurrently enters the `update` critical section. Any
1996 // concurrent calls to update are discarded. If coordinated updates are
1997 // required, the Mio user is responsible for handling that.
1998 //
1999 // Acquire / Release ordering is used on `update_lock` to ensure that
2000 // data access to the `token_*` variables are scoped to the critical
2001 // section.
2002
2003 // Acquire the update lock.
2004 #[allow(deprecated)]
2005 if self.update_lock.compare_and_swap(false, true, Acquire) {
2006 // The lock is already held. Discard the update
2007 return Ok(());
2008 }
2009
2010 // Relaxed ordering is acceptable here as the only memory that needs to
2011 // be visible as part of the update are the `token_*` variables, and
2012 // ordering has already been handled by the `update_lock` access.
2013 let mut state = self.state.load(Relaxed);
2014 let mut next;
2015
2016 // Read the current token, again this memory has been ordered by the
2017 // acquire on `update_lock`.
2018 let curr_token_pos = state.token_write_pos();
2019 let curr_token = unsafe { self::token(self, curr_token_pos) };
2020
2021 let mut next_token_pos = curr_token_pos;
2022
2023 // If the `update` call is changing the token, then compute the next
2024 // available token slot and write the token there.
2025 //
2026 // Note that this computation is happening *outside* of the
2027 // compare-and-swap loop. The update lock ensures that only a single
2028 // thread could be mutating the write_token_position, so the
2029 // `next_token_pos` will never need to be recomputed even if
2030 // `token_read_pos` concurrently changes. This is because
2031 // `token_read_pos` can ONLY concurrently change to the current value of
2032 // `token_write_pos`, so `next_token_pos` will always remain valid.
2033 if token != curr_token {
2034 next_token_pos = state.next_token_pos();
2035
2036 // Update the token
2037 match next_token_pos {
2038 0 => unsafe { *self.token_0.get() = token },
2039 1 => unsafe { *self.token_1.get() = token },
2040 2 => unsafe { *self.token_2.get() = token },
2041 _ => unreachable!(),
2042 }
2043 }
2044
2045 // Now enter the compare-and-swap loop
2046 loop {
2047 next = state;
2048
2049 // The node is only dropped once all `Registration` handles are
2050 // dropped. Only `Registration` can call `update`.
2051 debug_assert!(!state.is_dropped());
2052
2053 // Update the write token position, this will also release the token
2054 // to Poll::poll.
2055 next.set_token_write_pos(next_token_pos);
2056
2057 // Update readiness and poll opts
2058 next.set_interest(interest);
2059 next.set_poll_opt(opt);
2060
2061 // If there is effective readiness, the node will need to be queued
2062 // for processing. This exact behavior is still TBD, so we are
2063 // conservative for now and always fire.
2064 //
2065 // See https://github.com/carllerche/mio/issues/535.
2066 if !next.effective_readiness().is_empty() {
2067 next.set_queued();
2068 }
2069
2070 // compare-and-swap the state values. Only `Release` is needed here.
2071 // The `Release` ensures that `Poll::poll` will see the token
2072 // update and the update function doesn't care about any other
2073 // memory visibility.
2074 let actual = self.state.compare_and_swap(state, next, Release);
2075
2076 if actual == state {
2077 break;
2078 }
2079
2080 // CAS failed, but `curr_token_pos` should not have changed given
2081 // that we still hold the update lock.
2082 debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2083
2084 state = actual;
2085 }
2086
2087 // Release the lock
2088 self.update_lock.store(false, Release);
2089
2090 if !state.is_queued() && next.is_queued() {
2091 // We are responsible for enqueuing the node.
2092 enqueue_with_wakeup(queue, self)?;
2093 }
2094
2095 Ok(())
2096 }
2097}
2098
2099impl ops::Deref for RegistrationInner {
2100 type Target = ReadinessNode;
2101
2102 fn deref(&self) -> &ReadinessNode {
2103 unsafe { &*self.node }
2104 }
2105}
2106
2107impl Clone for RegistrationInner {
2108 fn clone(&self) -> RegistrationInner {
2109 // Using a relaxed ordering is alright here, as knowledge of the
2110 // original reference prevents other threads from erroneously deleting
2111 // the object.
2112 //
2113 // As explained in the [Boost documentation][1], Increasing the
2114 // reference counter can always be done with memory_order_relaxed: New
2115 // references to an object can only be formed from an existing
2116 // reference, and passing an existing reference from one thread to
2117 // another must already provide any required synchronization.
2118 //
2119 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2120 let old_size = self.ref_count.fetch_add(1, Relaxed);
2121
2122 // However we need to guard against massive refcounts in case someone
2123 // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2124 // and users will use-after free. We racily saturate to `isize::MAX` on
2125 // the assumption that there aren't ~2 billion threads incrementing
2126 // the reference count at once. This branch will never be taken in
2127 // any realistic program.
2128 //
2129 // We abort because such a program is incredibly degenerate, and we
2130 // don't care to support it.
2131 if old_size & !MAX_REFCOUNT != 0 {
2132 process::abort();
2133 }
2134
2135 RegistrationInner { node: self.node }
2136 }
2137}
2138
2139impl Drop for RegistrationInner {
2140 fn drop(&mut self) {
2141 // Only handles releasing from `Registration` and `SetReadiness`
2142 // handles. Poll has to call this itself.
2143 release_node(self.node);
2144 }
2145}
2146
2147/*
2148 *
2149 * ===== ReadinessQueue =====
2150 *
2151 */
2152
2153impl ReadinessQueue {
2154 /// Create a new `ReadinessQueue`.
2155 fn new() -> io::Result<ReadinessQueue> {
2156 is_send::<Self>();
2157 is_sync::<Self>();
2158
2159 let end_marker = Box::new(ReadinessNode::marker());
2160 let sleep_marker = Box::new(ReadinessNode::marker());
2161 let closed_marker = Box::new(ReadinessNode::marker());
2162
2163 let ptr = &*end_marker as *const _ as *mut _;
2164
2165 Ok(ReadinessQueue {
2166 inner: Rc::new(ReadinessQueueInner {
2167 awakener: sys::Awakener::new()?,
2168 head_readiness: AtomicPtr::new(ptr),
2169 tail_readiness: UnsafeCell::new(ptr),
2170 end_marker,
2171 sleep_marker,
2172 closed_marker,
2173 }),
2174 })
2175 }
2176
2177 /// Poll the queue for new events
2178 fn poll(&self, dst: &mut sys::Events) {
2179 // `until` is set with the first node that gets re-enqueued due to being
2180 // set to have level-triggered notifications. This prevents an infinite
2181 // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2182 let mut until = ptr::null_mut();
2183
2184 if dst.len() == dst.capacity() {
2185 // If `dst` is already full, the readiness queue won't be drained.
2186 // This might result in `sleep_marker` staying in the queue and
2187 // unnecessary pipe writes occurring.
2188 self.inner.clear_sleep_marker();
2189 }
2190
2191 'outer: while dst.len() < dst.capacity() {
2192 // Dequeue a node. If the queue is in an inconsistent state, then
2193 // stop polling. `Poll::poll` will be called again shortly and enter
2194 // a syscall, which should be enough to enable the other thread to
2195 // finish the queuing process.
2196 let ptr = match unsafe { self.inner.dequeue_node(until) } {
2197 Dequeue::Empty | Dequeue::Inconsistent => break,
2198 Dequeue::Data(ptr) => ptr,
2199 };
2200
2201 let node = unsafe { &*ptr };
2202
2203 // Read the node state with Acquire ordering. This allows reading
2204 // the token variables.
2205 let mut state = node.state.load(Acquire);
2206 let mut next;
2207 let mut readiness;
2208 let mut opt;
2209
2210 loop {
2211 // Build up any changes to the readiness node's state and
2212 // attempt the CAS at the end
2213 next = state;
2214
2215 // Given that the node was just read from the queue, the
2216 // `queued` flag should still be set.
2217 debug_assert!(state.is_queued());
2218
2219 // The dropped flag means we need to release the node and
2220 // perform no further processing on it.
2221 if state.is_dropped() {
2222 // Release the node and continue
2223 release_node(ptr);
2224 continue 'outer;
2225 }
2226
2227 // Process the node
2228 readiness = state.effective_readiness();
2229 opt = state.poll_opt();
2230
2231 if opt.is_edge() {
2232 // Mark the node as dequeued
2233 next.set_dequeued();
2234
2235 if opt.is_oneshot() && !readiness.is_empty() {
2236 next.disarm();
2237 }
2238 } else if readiness.is_empty() {
2239 next.set_dequeued();
2240 }
2241
2242 // Ensure `token_read_pos` is set to `token_write_pos` so that
2243 // we read the most up to date token value.
2244 next.update_token_read_pos();
2245
2246 if state == next {
2247 break;
2248 }
2249
2250 let actual = node.state.compare_and_swap(state, next, AcqRel);
2251
2252 if actual == state {
2253 break;
2254 }
2255
2256 state = actual;
2257 }
2258
2259 // If the queued flag is still set, then the node must be requeued.
2260 // This typically happens when using level-triggered notifications.
2261 if next.is_queued() {
2262 if until.is_null() {
2263 // We never want to see the node again
2264 until = ptr;
2265 }
2266
2267 // Requeue the node
2268 self.inner.enqueue_node(node);
2269 }
2270
2271 if !readiness.is_empty() {
2272 // Get the token
2273 let token = unsafe { token(node, next.token_read_pos()) };
2274
2275 // Push the event
2276 dst.push_event(Event::new(readiness, token));
2277 }
2278 }
2279 }
2280
2281 /// Prepare the queue for the `Poll::poll` thread to block in the system
2282 /// selector. This involves changing `head_readiness` to `sleep_marker`.
2283 /// Returns true if successful and `poll` can block.
2284 fn prepare_for_sleep(&self) -> bool {
2285 let end_marker = self.inner.end_marker();
2286 let sleep_marker = self.inner.sleep_marker();
2287
2288 let tail = unsafe { *self.inner.tail_readiness.get() };
2289
2290 // If the tail is currently set to the sleep_marker, then check if the
2291 // head is as well. If it is, then the queue is currently ready to
2292 // sleep. If it is not, then the queue is not empty and there should be
2293 // no sleeping.
2294 if tail == sleep_marker {
2295 return self.inner.head_readiness.load(Acquire) == sleep_marker;
2296 }
2297
2298 // If the tail is not currently set to `end_marker`, then the queue is
2299 // not empty.
2300 if tail != end_marker {
2301 return false;
2302 }
2303
2304 // The sleep marker is *not* currently in the readiness queue.
2305 //
2306 // The sleep marker is only inserted in this function. It is also only
2307 // inserted in the tail position. This is guaranteed by first checking
2308 // that the end marker is in the tail position, pushing the sleep marker
2309 // after the end marker, then removing the end marker.
2310 //
2311 // Before inserting a node into the queue, the next pointer has to be
2312 // set to null. Again, this is only safe to do when the node is not
2313 // currently in the queue, but we already have ensured this.
2314 self.inner
2315 .sleep_marker
2316 .next_readiness
2317 .store(ptr::null_mut(), Relaxed);
2318
2319 #[allow(deprecated)]
2320 let actual =
2321 self.inner
2322 .head_readiness
2323 .compare_and_swap(end_marker, sleep_marker, AcqRel);
2324
2325 debug_assert!(actual != sleep_marker);
2326
2327 if actual != end_marker {
2328 // The readiness queue is not empty
2329 return false;
2330 }
2331
2332 // The current tail should be pointing to `end_marker`
2333 debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2334 // The `end_marker` next pointer should be null
2335 debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2336
2337 // Update tail pointer.
2338 unsafe {
2339 *self.inner.tail_readiness.get() = sleep_marker;
2340 }
2341 true
2342 }
2343}
2344
2345impl Drop for ReadinessQueue {
2346 fn drop(&mut self) {
2347 // Close the queue by enqueuing the closed node
2348 self.inner.enqueue_node(&self.inner.closed_marker);
2349
2350 loop {
2351 // Free any nodes that happen to be left in the readiness queue
2352 let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2353 Dequeue::Empty => break,
2354 Dequeue::Inconsistent => {
2355 // This really shouldn't be possible as all other handles to
2356 // `ReadinessQueueInner` are dropped, but handle this by
2357 // spinning I guess?
2358 continue;
2359 }
2360 Dequeue::Data(ptr) => ptr,
2361 };
2362
2363 let node = unsafe { &*ptr };
2364
2365 let state = node.state.load(Acquire);
2366
2367 debug_assert!(state.is_queued());
2368
2369 release_node(ptr);
2370 }
2371 }
2372}
2373
2374impl ReadinessQueueInner {
2375 fn wakeup(&self) -> io::Result<()> {
2376 self.awakener.wakeup()
2377 }
2378
2379 /// Prepend the given node to the head of the readiness queue. This is done
2380 /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2381 fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2382 if self.enqueue_node(node) {
2383 self.wakeup()?;
2384 }
2385
2386 Ok(())
2387 }
2388
2389 /// Push the node into the readiness queue
2390 fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2391 // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2392 let node_ptr = node as *const _ as *mut _;
2393
2394 // Relaxed used as the ordering is "released" when swapping
2395 // `head_readiness`
2396 node.next_readiness.store(ptr::null_mut(), Relaxed);
2397
2398 unsafe {
2399 let mut prev = self.head_readiness.load(Acquire);
2400
2401 loop {
2402 if prev == self.closed_marker() {
2403 debug_assert!(node_ptr != self.closed_marker());
2404 // debug_assert!(node_ptr != self.end_marker());
2405 debug_assert!(node_ptr != self.sleep_marker());
2406
2407 if node_ptr != self.end_marker() {
2408 // The readiness queue is shutdown, but the enqueue flag was
2409 // set. This means that we are responsible for decrementing
2410 // the ready queue's ref count
2411 debug_assert!(node.ref_count.load(Relaxed) >= 2);
2412 release_node(node_ptr);
2413 }
2414
2415 return false;
2416 }
2417
2418 #[allow(deprecated)]
2419 let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2420
2421 if prev == act {
2422 break;
2423 }
2424
2425 prev = act;
2426 }
2427
2428 debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2429
2430 (*prev).next_readiness.store(node_ptr, Release);
2431
2432 prev == self.sleep_marker()
2433 }
2434 }
2435
2436 fn clear_sleep_marker(&self) {
2437 let end_marker = self.end_marker();
2438 let sleep_marker = self.sleep_marker();
2439
2440 unsafe {
2441 let tail = *self.tail_readiness.get();
2442
2443 if tail != self.sleep_marker() {
2444 return;
2445 }
2446
2447 // The empty markeer is *not* currently in the readiness queue
2448 // (since the sleep markeris).
2449 self.end_marker
2450 .next_readiness
2451 .store(ptr::null_mut(), Relaxed);
2452
2453 #[allow(deprecated)]
2454 let actual =
2455 self.head_readiness
2456 .compare_and_swap(sleep_marker, end_marker, AcqRel);
2457
2458 debug_assert!(actual != end_marker);
2459
2460 if actual != sleep_marker {
2461 // The readiness queue is not empty, we cannot remove the sleep
2462 // markeer
2463 return;
2464 }
2465
2466 // Update the tail pointer.
2467 *self.tail_readiness.get() = end_marker;
2468 }
2469 }
2470
2471 /// Must only be called in `poll` or `drop`
2472 unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2473 // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2474 // with the modifications mentioned at the top of the file.
2475 let mut tail = *self.tail_readiness.get();
2476 let mut next = (*tail).next_readiness.load(Acquire);
2477
2478 if tail == self.end_marker()
2479 || tail == self.sleep_marker()
2480 || tail == self.closed_marker()
2481 {
2482 if next.is_null() {
2483 // Make sure the sleep marker is removed (as we are no longer
2484 // sleeping
2485 self.clear_sleep_marker();
2486
2487 return Dequeue::Empty;
2488 }
2489
2490 *self.tail_readiness.get() = next;
2491 tail = next;
2492 next = (*next).next_readiness.load(Acquire);
2493 }
2494
2495 // Only need to check `until` at this point. `until` is either null,
2496 // which will never match tail OR it is a node that was pushed by
2497 // the current thread. This means that either:
2498 //
2499 // 1) The queue is inconsistent, which is handled explicitly
2500 // 2) We encounter `until` at this point in dequeue
2501 // 3) we will pop a different node
2502 if tail == until {
2503 return Dequeue::Empty;
2504 }
2505
2506 if !next.is_null() {
2507 *self.tail_readiness.get() = next;
2508 return Dequeue::Data(tail);
2509 }
2510
2511 if self.head_readiness.load(Acquire) != tail {
2512 return Dequeue::Inconsistent;
2513 }
2514
2515 // Push the stub node
2516 self.enqueue_node(&self.end_marker);
2517
2518 next = (*tail).next_readiness.load(Acquire);
2519
2520 if !next.is_null() {
2521 *self.tail_readiness.get() = next;
2522 return Dequeue::Data(tail);
2523 }
2524
2525 Dequeue::Inconsistent
2526 }
2527
2528 fn end_marker(&self) -> *mut ReadinessNode {
2529 &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2530 }
2531
2532 fn sleep_marker(&self) -> *mut ReadinessNode {
2533 &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2534 }
2535
2536 fn closed_marker(&self) -> *mut ReadinessNode {
2537 &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2538 }
2539}
2540
2541impl ReadinessNode {
2542 /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2543 fn new(
2544 queue: *mut (),
2545 token: Token,
2546 interest: Ready,
2547 opt: PollOpt,
2548 ref_count: usize,
2549 ) -> ReadinessNode {
2550 ReadinessNode {
2551 state: AtomicState::new(interest, opt),
2552 // Only the first token is set, the others are initialized to 0
2553 token_0: UnsafeCell::new(token),
2554 token_1: UnsafeCell::new(Token(0)),
2555 token_2: UnsafeCell::new(Token(0)),
2556 next_readiness: AtomicPtr::new(ptr::null_mut()),
2557 update_lock: AtomicBool::new(false),
2558 readiness_queue: AtomicPtr::new(queue),
2559 ref_count: AtomicUsize::new(ref_count),
2560 }
2561 }
2562
2563 fn marker() -> ReadinessNode {
2564 ReadinessNode {
2565 state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2566 token_0: UnsafeCell::new(Token(0)),
2567 token_1: UnsafeCell::new(Token(0)),
2568 token_2: UnsafeCell::new(Token(0)),
2569 next_readiness: AtomicPtr::new(ptr::null_mut()),
2570 update_lock: AtomicBool::new(false),
2571 readiness_queue: AtomicPtr::new(ptr::null_mut()),
2572 ref_count: AtomicUsize::new(0),
2573 }
2574 }
2575
2576 fn enqueue_with_wakeup(&self) -> io::Result<()> {
2577 let queue = self.readiness_queue.load(Acquire);
2578
2579 if queue.is_null() {
2580 // Not associated with a queue, nothing to do
2581 return Ok(());
2582 }
2583
2584 enqueue_with_wakeup(queue, self)
2585 }
2586}
2587
2588fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2589 debug_assert!(!queue.is_null());
2590 // This is ugly... but we don't want to bump the ref count.
2591 let queue: &Arc<ReadinessQueueInner> =
2592 unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2593 queue.enqueue_node_with_wakeup(node)
2594}
2595
2596unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2597 match pos {
2598 0 => *node.token_0.get(),
2599 1 => *node.token_1.get(),
2600 2 => *node.token_2.get(),
2601 _ => unreachable!(),
2602 }
2603}
2604
2605fn release_node(ptr: *mut ReadinessNode) {
2606 unsafe {
2607 // `AcqRel` synchronizes with other `release_node` functions and ensures
2608 // that the drop happens after any reads / writes on other threads.
2609 if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2610 return;
2611 }
2612
2613 let node = Box::from_raw(ptr);
2614
2615 // Decrement the readiness_queue Arc
2616 let queue = node.readiness_queue.load(Acquire);
2617
2618 if queue.is_null() {
2619 return;
2620 }
2621
2622 let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2623 }
2624}
2625
2626impl AtomicState {
2627 fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2628 let state = ReadinessState::new(interest, opt);
2629
2630 AtomicState {
2631 inner: AtomicUsize::new(state.into()),
2632 }
2633 }
2634
2635 /// Loads the current `ReadinessState`
2636 fn load(&self, order: Ordering) -> ReadinessState {
2637 self.inner.load(order).into()
2638 }
2639
2640 /// Stores a state if the current state is the same as `current`.
2641 fn compare_and_swap(
2642 &self,
2643 current: ReadinessState,
2644 new: ReadinessState,
2645 order: Ordering,
2646 ) -> ReadinessState {
2647 #[allow(deprecated)]
2648 self.inner
2649 .compare_and_swap(current.into(), new.into(), order)
2650 .into()
2651 }
2652
2653 // Returns `true` if the node should be queued
2654 fn flag_as_dropped(&self) -> bool {
2655 let prev: ReadinessState = self
2656 .inner
2657 .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2658 .into();
2659 // The flag should not have been previously set
2660 debug_assert!(!prev.is_dropped());
2661
2662 !prev.is_queued()
2663 }
2664}
2665
2666impl ReadinessState {
2667 // Create a `ReadinessState` initialized with the provided arguments
2668 #[inline]
2669 fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2670 let interest = event::ready_as_usize(interest);
2671 let opt = event::opt_as_usize(opt);
2672
2673 debug_assert!(interest <= MASK_4);
2674 debug_assert!(opt <= MASK_4);
2675
2676 let mut val = interest << INTEREST_SHIFT;
2677 val |= opt << POLL_OPT_SHIFT;
2678
2679 ReadinessState(val)
2680 }
2681
2682 #[inline]
2683 fn get(self, mask: usize, shift: usize) -> usize {
2684 (self.0 >> shift) & mask
2685 }
2686
2687 #[inline]
2688 fn set(&mut self, val: usize, mask: usize, shift: usize) {
2689 self.0 = (self.0 & !(mask << shift)) | (val << shift)
2690 }
2691
2692 /// Get the readiness
2693 #[inline]
2694 fn readiness(self) -> Ready {
2695 let v = self.get(MASK_4, READINESS_SHIFT);
2696 event::ready_from_usize(v)
2697 }
2698
2699 #[inline]
2700 fn effective_readiness(self) -> Ready {
2701 self.readiness() & self.interest()
2702 }
2703
2704 /// Set the readiness
2705 #[inline]
2706 fn set_readiness(&mut self, v: Ready) {
2707 self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2708 }
2709
2710 /// Get the interest
2711 #[inline]
2712 fn interest(self) -> Ready {
2713 let v = self.get(MASK_4, INTEREST_SHIFT);
2714 event::ready_from_usize(v)
2715 }
2716
2717 /// Set the interest
2718 #[inline]
2719 fn set_interest(&mut self, v: Ready) {
2720 self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2721 }
2722
2723 #[inline]
2724 fn disarm(&mut self) {
2725 self.set_interest(Ready::empty());
2726 }
2727
2728 /// Get the poll options
2729 #[inline]
2730 fn poll_opt(self) -> PollOpt {
2731 let v = self.get(MASK_4, POLL_OPT_SHIFT);
2732 event::opt_from_usize(v)
2733 }
2734
2735 /// Set the poll options
2736 #[inline]
2737 fn set_poll_opt(&mut self, v: PollOpt) {
2738 self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2739 }
2740
2741 #[inline]
2742 fn is_queued(self) -> bool {
2743 self.0 & QUEUED_MASK == QUEUED_MASK
2744 }
2745
2746 /// Set the queued flag
2747 #[inline]
2748 fn set_queued(&mut self) {
2749 // Dropped nodes should never be queued
2750 debug_assert!(!self.is_dropped());
2751 self.0 |= QUEUED_MASK;
2752 }
2753
2754 #[inline]
2755 fn set_dequeued(&mut self) {
2756 debug_assert!(self.is_queued());
2757 self.0 &= !QUEUED_MASK
2758 }
2759
2760 #[inline]
2761 fn is_dropped(self) -> bool {
2762 self.0 & DROPPED_MASK == DROPPED_MASK
2763 }
2764
2765 #[inline]
2766 fn token_read_pos(self) -> usize {
2767 self.get(MASK_2, TOKEN_RD_SHIFT)
2768 }
2769
2770 #[inline]
2771 fn token_write_pos(self) -> usize {
2772 self.get(MASK_2, TOKEN_WR_SHIFT)
2773 }
2774
2775 #[inline]
2776 fn next_token_pos(self) -> usize {
2777 let rd = self.token_read_pos();
2778 let wr = self.token_write_pos();
2779
2780 match wr {
2781 0 => match rd {
2782 1 => 2,
2783 2 => 1,
2784 0 => 1,
2785 _ => unreachable!(),
2786 },
2787 1 => match rd {
2788 0 => 2,
2789 2 => 0,
2790 1 => 2,
2791 _ => unreachable!(),
2792 },
2793 2 => match rd {
2794 0 => 1,
2795 1 => 0,
2796 2 => 0,
2797 _ => unreachable!(),
2798 },
2799 _ => unreachable!(),
2800 }
2801 }
2802
2803 #[inline]
2804 fn set_token_write_pos(&mut self, val: usize) {
2805 self.set(val, MASK_2, TOKEN_WR_SHIFT);
2806 }
2807
2808 #[inline]
2809 fn update_token_read_pos(&mut self) {
2810 let val = self.token_write_pos();
2811 self.set(val, MASK_2, TOKEN_RD_SHIFT);
2812 }
2813}
2814
2815impl From<ReadinessState> for usize {
2816 fn from(src: ReadinessState) -> usize {
2817 src.0
2818 }
2819}
2820
2821impl From<usize> for ReadinessState {
2822 fn from(src: usize) -> ReadinessState {
2823 ReadinessState(src)
2824 }
2825}
2826
2827fn is_send<T: Send>() {}
2828fn is_sync<T: Sync>() {}
2829
2830impl SelectorId {
2831 #[allow(unused)]
2832 pub fn new() -> SelectorId {
2833 SelectorId {
2834 id: AtomicUsize::new(0),
2835 }
2836 }
2837
2838 #[allow(unused)]
2839 pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2840 let selector_id = self.id.load(Ordering::SeqCst);
2841
2842 if selector_id != 0 && selector_id != poll.selector.id() {
2843 Err(io::Error::other("socket already registered"))
2844 } else {
2845 self.id.store(poll.selector.id(), Ordering::SeqCst);
2846 Ok(())
2847 }
2848 }
2849}
2850
2851impl Clone for SelectorId {
2852 fn clone(&self) -> SelectorId {
2853 SelectorId {
2854 id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2855 }
2856 }
2857}
2858
2859#[test]
2860#[cfg(all(unix, not(target_os = "fuchsia")))]
2861pub fn as_raw_fd() {
2862 let poll = Poll::new().unwrap();
2863 assert!(poll.as_raw_fd() > 0);
2864}