corcovado/
poll.rs

1use event_imp::{self as event, Event, Evented, PollOpt, Ready};
2use std::cell::UnsafeCell;
3#[cfg(all(unix, not(target_os = "fuchsia")))]
4use std::os::unix::io::AsRawFd;
5#[cfg(all(unix, not(target_os = "fuchsia")))]
6use std::os::unix::io::RawFd;
7use std::process;
8use std::rc::Rc;
9use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
10use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::{Duration, Instant};
13use std::{fmt, io, ptr};
14use std::{mem, ops};
15use {sys, Token};
16
17// Poll is backed by two readiness queues. The first is a system readiness queue
18// represented by `sys::Selector`. The system readiness queue handles events
19// provided by the system, such as TCP and UDP. The second readiness queue is
20// implemented in user space by `ReadinessQueue`. It provides a way to implement
21// purely user space `Evented` types.
22//
23// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
24// list nodes. This significantly reduces the number of required allocations.
25// Each `Registration` / `SetReadiness` pair allocates a single readiness node
26// that is used for the lifetime of the registration.
27//
28// The readiness node also includes a single atomic variable, `state` that
29// tracks most of the state associated with the registration. This includes the
30// current readiness, interest, poll options, and internal state. When the node
31// state is mutated, it is queued in the MPSC channel. A call to
32// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
33// still be mutated while it is queued in the channel for processing.
34// Intermediate state values do not matter as long as the final state is
35// included in the call to `poll`. This is the eventually consistent nature of
36// the readiness queue.
37//
38// The readiness node is ref counted using the `ref_count` field. On creation,
39// the ref_count is initialized to 3: one `Registration` handle, one
40// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
41// doesn't *always* hold a handle to the node, we don't use the Arc type for
42// managing ref counts (this is to avoid constantly incrementing and
43// decrementing the ref count when pushing & popping from the queue). When the
44// `Registration` handle is dropped, the `dropped` flag is set on the node, then
45// the node is pushed into the registration queue. When Poll::poll pops the
46// node, it sees the drop flag is set, and decrements it's ref count.
47//
48// The MPSC queue is a modified version of the intrusive MPSC node based queue
49// described by 1024cores [1].
50//
51// The first modification is that two markers are used instead of a single
52// `stub`. The second marker is a `sleep_marker` which is used to signal to
53// producers that the consumer is going to sleep. This sleep_marker is only used
54// when the queue is empty, implying that the only node in the queue is
55// `end_marker`.
56//
57// The second modification is an `until` argument passed to the dequeue
58// function. When `poll` encounters a level-triggered node, the node will be
59// immediately pushed back into the queue. In order to avoid an infinite loop,
60// `poll` before pushing the node, the pointer is saved off and then passed
61// again as the `until` argument. If the next node to pop is `until`, then
62// `Dequeue::Empty` is returned.
63//
64// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
65
66/// Polls for readiness events on all registered values.
67///
68/// `Poll` allows a program to monitor a large number of `Evented` types,
69/// waiting until one or more become "ready" for some class of operations; e.g.
70/// reading and writing. An `Evented` type is considered ready if it is possible
71/// to immediately perform a corresponding operation; e.g. [`read`] or
72/// [`write`].
73///
74/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
75/// instance using the [`register`] method, supplying readiness interest. The
76/// readiness interest tells `Poll` which specific operations on the handle to
77/// monitor for readiness. A `Token` is also passed to the [`register`]
78/// function. When `Poll` returns a readiness event, it will include this token.
79/// This associates the event with the `Evented` handle that generated the
80/// event.
81///
82/// [`read`]: tcp/struct.TcpStream.html#method.read
83/// [`write`]: tcp/struct.TcpStream.html#method.write
84/// [`register`]: #method.register
85///
86// # Examples
87//
88// A basic example -- establishing a `TcpStream` connection.
89//
90// ```
91// # use std::error::Error;
92// # fn try_main() -> Result<(), Box<dyn Error>> {
93// use corcovado::{Events, Poll, Ready, PollOpt, Token};
94//
95// use std::net::{TcpStream, TcpListener, SocketAddr};
96//
97// // Bind a server socket to connect to.
98// let addr: SocketAddr = "127.0.0.1:0".parse()?;
99// let server = TcpListener::bind(&addr)?;
100//
101// // Construct a new `Poll` handle as well as the `Events` we'll store into
102// let poll = Poll::new()?;
103// let mut events = Events::with_capacity(1024);
104//
105// // Connect the stream
106// let stream = TcpStream::connect(&server.local_addr()?)?;
107//
108// // Register the stream with `Poll`
109// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
110//
111// // Wait for the socket to become ready. This has to happens in a loop to
112// // handle spurious wakeups.
113// loop {
114//     poll.poll(&mut events, None)?;
115//
116//     for event in &events {
117//         if event.token() == Token(0) && event.readiness().is_writable() {
118//             // The socket connected (probably, it could still be a spurious
119//             // wakeup)
120//             return Ok(());
121//         }
122//     }
123// }
124// #     Ok(())
125// # }
126// #
127// # fn main() {
128// #     try_main().unwrap();
129// # }
130// ```
131///
132/// # Edge-triggered and level-triggered
133///
134/// An [`Evented`] registration may request edge-triggered events or
135/// level-triggered events. This is done by setting `register`'s
136/// [`PollOpt`] argument to either [`edge`] or [`level`].
137///
138/// The difference between the two can be described as follows. Supposed that
139/// this scenario happens:
140///
141/// 1. A [`TcpStream`] is registered with `Poll`.
142/// 2. The socket receives 2kb of data.
143/// 3. A call to [`Poll::poll`] returns the token associated with the socket
144///    indicating readable readiness.
145/// 4. 1kb is read from the socket.
146/// 5. Another call to [`Poll::poll`] is made.
147///
148/// If when the socket was registered with `Poll`, edge triggered events were
149/// requested, then the call to [`Poll::poll`] done in step **5** will
150/// (probably) hang despite there being another 1kb still present in the socket
151/// read buffer. The reason for this is that edge-triggered mode delivers events
152/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
153/// caller might end up waiting for some data that is already present inside the
154/// socket buffer.
155///
156/// With edge-triggered events, operations **must** be performed on the
157/// `Evented` type until [`WouldBlock`] is returned. In other words, after
158/// receiving an event indicating readiness for a certain operation, one should
159/// assume that [`Poll::poll`] may never return another event for the same token
160/// and readiness until the operation returns [`WouldBlock`].
161///
162/// By contrast, when level-triggered notifications was requested, each call to
163/// [`Poll::poll`] will return an event for the socket as long as data remains
164/// in the socket buffer. Generally, level-triggered events should be avoided if
165/// high performance is a concern.
166///
167/// Since even with edge-triggered events, multiple events can be generated upon
168/// receipt of multiple chunks of data, the caller has the option to set the
169/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
170/// after the event is returned from [`Poll::poll`]. The subsequent calls to
171/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
172/// are disabled even if the readiness state changes. The handle can be
173/// re-enabled by calling [`reregister`]. When handles are disabled, internal
174/// resources used to monitor the handle are maintained until the handle is
175/// dropped or deregistered. This makes re-registering the handle a fast
176/// operation.
177///
178/// For example, in the following scenario:
179///
180/// 1. A [`TcpStream`] is registered with `Poll`.
181/// 2. The socket receives 2kb of data.
182/// 3. A call to [`Poll::poll`] returns the token associated with the socket
183///    indicating readable readiness.
184/// 4. 2kb is read from the socket.
185/// 5. Another call to read is issued and [`WouldBlock`] is returned
186/// 6. The socket receives another 2kb of data.
187/// 7. Another call to [`Poll::poll`] is made.
188///
189/// Assuming the socket was registered with `Poll` with the [`edge`] and
190/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
191/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
192/// returning an event.
193///
194/// In order to receive the event for the data received in step 6, the socket
195/// would need to be reregistered using [`reregister`].
196///
197/// [`PollOpt`]: struct.PollOpt.html
198/// [`edge`]: struct.PollOpt.html#method.edge
199/// [`level`]: struct.PollOpt.html#method.level
200/// [`Poll::poll`]: struct.Poll.html#method.poll
201/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
202/// [`Evented`]: event/trait.Evented.html
203/// [`TcpStream`]: tcp/struct.TcpStream.html
204/// [`reregister`]: #method.reregister
205/// [`oneshot`]: struct.PollOpt.html#method.oneshot
206///
207/// # Portability
208///
209/// Using `Poll` provides a portable interface across supported platforms as
210/// long as the caller takes the following into consideration:
211///
212/// ### Spurious events
213///
214/// [`Poll::poll`] may return readiness events even if the associated
215/// [`Evented`] handle is not actually ready. Given the same code, this may
216/// happen more on some platforms than others. It is important to never assume
217/// that, just because a readiness notification was received, that the
218/// associated operation will succeed as well.
219///
220/// If operation fails with [`WouldBlock`], then the caller should not treat
221/// this as an error, but instead should wait until another readiness event is
222/// received.
223///
224/// ### Draining readiness
225///
226/// When using edge-triggered mode, once a readiness event is received, the
227/// corresponding operation must be performed repeatedly until it returns
228/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
229/// readiness event will be delivered, even if further data is received for the
230/// [`Evented`] handle.
231///
232/// For example, in the first scenario described above, after step 5, even if
233/// the socket receives more data there is no guarantee that another readiness
234/// event will be delivered.
235///
236/// ### Readiness operations
237///
238/// The only readiness operations that are guaranteed to be present on all
239/// supported platforms are [`readable`] and [`writable`]. All other readiness
240/// operations may have false negatives and as such should be considered
241/// **hints**. This means that if a socket is registered with [`readable`],
242/// [`error`], and [`hup`] interest, and either an error or hup is received, a
243/// readiness event will be generated for the socket, but it **may** only
244/// include `readable` readiness. Also note that, given the potential for
245/// spurious events, receiving a readiness event with `hup` or `error` doesn't
246/// actually mean that a `read` on the socket will return a result matching the
247/// readiness event.
248///
249/// In other words, portable programs that explicitly check for [`hup`] or
250/// [`error`] readiness should be doing so as an **optimization** and always be
251/// able to handle an error or HUP situation when performing the actual read
252/// operation.
253///
254/// [`readable`]: struct.Ready.html#method.readable
255/// [`writable`]: struct.Ready.html#method.writable
256/// [`error`]: unix/struct.UnixReady.html#method.error
257/// [`hup`]: unix/struct.UnixReady.html#method.hup
258///
259/// ### Registering handles
260///
261/// Unless otherwise noted, it should be assumed that types implementing
262/// [`Evented`] will never become ready unless they are registered with `Poll`.
263///
264/// For example:
265///
266// ```
267// # use std::error::Error;
268// # fn try_main() -> Result<(), Box<dyn Error>> {
269// use corcovado::{Poll, Ready, PollOpt, Token};
270// use std::net::TcpStream;
271// use std::time::Duration;
272// use std::thread;
273//
274// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275//
276// thread::sleep(Duration::from_secs(1));
277//
278// let poll = Poll::new()?;
279//
280// // The connect is not guaranteed to have started until it is registered at
281// // this point
282// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283// #     Ok(())
284// # }
285// #
286// # fn main() {
287// #     try_main().unwrap();
288// # }
289// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// |      OS    |  Selector |
296/// |------------|-----------|
297/// | Linux      | [epoll]   |
298/// | OS X, iOS  | [kqueue]  |
299/// | Windows    | [IOCP]    |
300/// | FreeBSD    | [kqueue]  |
301/// | Android    | [epoll]   |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model Mio's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
323/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
324/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
325/// [`EventedFd`]: unix/struct.EventedFd.html
326/// [`SetReadiness`]: struct.SetReadiness.html
327/// [`Poll::poll`]: struct.Poll.html#method.poll
328pub struct Poll {
329    // Platform specific IO selector
330    selector: sys::Selector,
331
332    // Custom readiness queue
333    readiness_queue: ReadinessQueue,
334
335    // Use an atomic to first check if a full lock will be required. This is a
336    // fast-path check for single threaded cases avoiding the extra syscall
337    lock_state: AtomicUsize,
338
339    // Sequences concurrent calls to `Poll::poll`
340    lock: Mutex<()>,
341
342    // Wakeup the next waiter
343    condvar: Condvar,
344}
345
346/// Handle to a user space `Poll` registration.
347///
348/// `Registration` allows implementing [`Evented`] for types that cannot work
349/// with the [system selector]. A `Registration` is always paired with a
350/// `SetReadiness`, which allows updating the registration's readiness state.
351/// When [`set_readiness`] is called and the `Registration` is associated with a
352/// [`Poll`] instance, a readiness event will be created and eventually returned
353/// by [`poll`].
354///
355/// A `Registration` / `SetReadiness` pair is created by calling
356/// [`Registration::new2`]. At this point, the registration is not being
357/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
358/// result in any readiness notifications.
359///
360/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
361/// the same [`register`], [`reregister`], and [`deregister`] functions used
362/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
363/// changes result in readiness events being dispatched to the [`Poll`] instance
364/// with which `Registration` is registered.
365///
366/// **Note**, before using `Registration` be sure to read the
367/// [`set_readiness`] documentation and the [portability] notes. The
368/// guarantees offered by `Registration` may be weaker than expected.
369///
370/// For high level documentation, see [`Poll`].
371///
372/// # Examples
373///
374/// ```
375/// use corcovado::{Ready, Registration, Poll, PollOpt, Token};
376/// use corcovado::event::Evented;
377///
378/// use std::io;
379/// use std::time::Instant;
380/// use std::thread;
381///
382/// pub struct Deadline {
383///     when: Instant,
384///     registration: Registration,
385/// }
386///
387/// impl Deadline {
388///     pub fn new(when: Instant) -> Deadline {
389///         let (registration, set_readiness) = Registration::new2();
390///
391///         thread::spawn(move || {
392///             let now = Instant::now();
393///
394///             if now < when {
395///                 thread::sleep(when - now);
396///             }
397///
398///             set_readiness.set_readiness(Ready::readable());
399///         });
400///
401///         Deadline {
402///             when: when,
403///             registration: registration,
404///         }
405///     }
406///
407///     pub fn is_elapsed(&self) -> bool {
408///         Instant::now() >= self.when
409///     }
410/// }
411///
412/// impl Evented for Deadline {
413///     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
414///         -> io::Result<()>
415///     {
416///         self.registration.register(poll, token, interest, opts)
417///     }
418///
419///     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
420///         -> io::Result<()>
421///     {
422///         self.registration.reregister(poll, token, interest, opts)
423///     }
424///
425///     fn deregister(&self, poll: &Poll) -> io::Result<()> {
426///         poll.deregister(&self.registration)
427///     }
428/// }
429/// ```
430///
431/// [system selector]: struct.Poll.html#implementation-notes
432/// [`Poll`]: struct.Poll.html
433/// [`Registration::new2`]: struct.Registration.html#method.new2
434/// [`Evented`]: event/trait.Evented.html
435/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
436/// [`register`]: struct.Poll.html#method.register
437/// [`reregister`]: struct.Poll.html#method.reregister
438/// [`deregister`]: struct.Poll.html#method.deregister
439/// [portability]: struct.Poll.html#portability
440pub struct Registration {
441    inner: RegistrationInner,
442}
443
444unsafe impl Send for Registration {}
445unsafe impl Sync for Registration {}
446
447/// Updates the readiness state of the associated `Registration`.
448///
449/// See [`Registration`] for more documentation on using `SetReadiness` and
450/// [`Poll`] for high level polling documentation.
451///
452/// [`Poll`]: struct.Poll.html
453/// [`Registration`]: struct.Registration.html
454#[derive(Clone)]
455pub struct SetReadiness {
456    inner: RegistrationInner,
457}
458
459unsafe impl Send for SetReadiness {}
460unsafe impl Sync for SetReadiness {}
461
462/// Used to associate an IO type with a Selector
463#[derive(Debug)]
464pub struct SelectorId {
465    id: AtomicUsize,
466}
467
468struct RegistrationInner {
469    // Unsafe pointer to the registration's node. The node is ref counted. This
470    // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
471    // handle though it isn't stored anywhere. In other words, `Poll::poll`
472    // needs to decrement the ref count before the node is freed.
473    node: *mut ReadinessNode,
474}
475
476#[derive(Clone)]
477struct ReadinessQueue {
478    inner: Rc<ReadinessQueueInner>,
479}
480
481unsafe impl Send for ReadinessQueue {}
482unsafe impl Sync for ReadinessQueue {}
483
484struct ReadinessQueueInner {
485    // Used to wake up `Poll` when readiness is set in another thread.
486    awakener: sys::Awakener,
487
488    // Head of the MPSC queue used to signal readiness to `Poll::poll`.
489    head_readiness: AtomicPtr<ReadinessNode>,
490
491    // Tail of the readiness queue.
492    //
493    // Only accessed by Poll::poll. Coordination will be handled by the poll fn
494    tail_readiness: UnsafeCell<*mut ReadinessNode>,
495
496    // Fake readiness node used to punctuate the end of the readiness queue.
497    // Before attempting to read from the queue, this node is inserted in order
498    // to partition the queue between nodes that are "owned" by the dequeue end
499    // and nodes that will be pushed on by producers.
500    end_marker: Box<ReadinessNode>,
501
502    // Similar to `end_marker`, but this node signals to producers that `Poll`
503    // has gone to sleep and must be woken up.
504    sleep_marker: Box<ReadinessNode>,
505
506    // Similar to `end_marker`, but the node signals that the queue is closed.
507    // This happens when `ReadyQueue` is dropped and signals to producers that
508    // the nodes should no longer be pushed into the queue.
509    closed_marker: Box<ReadinessNode>,
510}
511
512/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
513/// queued into the MPSC channel.
514struct ReadinessNode {
515    // Node state, see struct docs for `ReadinessState`
516    //
517    // This variable is the primary point of coordination between all the
518    // various threads concurrently accessing the node.
519    state: AtomicState,
520
521    // The registration token cannot fit into the `state` variable, so it is
522    // broken out here. In order to atomically update both the state and token
523    // we have to jump through a few hoops.
524    //
525    // First, `state` includes `token_read_pos` and `token_write_pos`. These can
526    // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
527    // the token slot that contains the most up to date registration token.
528    // `token_read_pos` is the token slot that `poll` is currently reading from.
529    //
530    // When a call to `update` includes a different token than the one currently
531    // associated with the registration (token_write_pos), first an unused token
532    // slot is found. The unused slot is the one not represented by
533    // `token_read_pos` OR `token_write_pos`. The new token is written to this
534    // slot, then `state` is updated with the new `token_write_pos` value. This
535    // requires that there is only a *single* concurrent call to `update`.
536    //
537    // When `poll` reads a node state, it checks that `token_read_pos` matches
538    // `token_write_pos`. If they do not match, then it atomically updates
539    // `state` such that `token_read_pos` is set to `token_write_pos`. It will
540    // then read the token at the newly updated `token_read_pos`.
541    token_0: UnsafeCell<Token>,
542    token_1: UnsafeCell<Token>,
543    token_2: UnsafeCell<Token>,
544
545    // Used when the node is queued in the readiness linked list. Accessing
546    // this field requires winning the "queue" lock
547    next_readiness: AtomicPtr<ReadinessNode>,
548
549    // Ensures that there is only one concurrent call to `update`.
550    //
551    // Each call to `update` will attempt to swap `update_lock` from `false` to
552    // `true`. If the CAS succeeds, the thread has obtained the update lock. If
553    // the CAS fails, then the `update` call returns immediately and the update
554    // is discarded.
555    update_lock: AtomicBool,
556
557    // Pointer to Arc<ReadinessQueueInner>
558    readiness_queue: AtomicPtr<()>,
559
560    // Tracks the number of `ReadyRef` pointers
561    ref_count: AtomicUsize,
562}
563
564/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
565/// atomic variable handles encoding / decoding `ReadinessState` values.
566struct AtomicState {
567    inner: AtomicUsize,
568}
569
570const MASK_2: usize = 4 - 1;
571const MASK_4: usize = 16 - 1;
572const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
573const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
574
575const READINESS_SHIFT: usize = 0;
576const INTEREST_SHIFT: usize = 4;
577const POLL_OPT_SHIFT: usize = 8;
578const TOKEN_RD_SHIFT: usize = 12;
579const TOKEN_WR_SHIFT: usize = 14;
580const QUEUED_SHIFT: usize = 16;
581const DROPPED_SHIFT: usize = 17;
582
583/// Tracks all state for a single `ReadinessNode`. The state is packed into a
584/// `usize` variable from low to high bit as follows:
585///
586/// 4 bits: Registration current readiness
587/// 4 bits: Registration interest
588/// 4 bits: Poll options
589/// 2 bits: Token position currently being read from by `poll`
590/// 2 bits: Token position last written to by `update`
591/// 1 bit:  Queued flag, set when node is being pushed into MPSC queue.
592/// 1 bit:  Dropped flag, set when all `Registration` handles have been dropped.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594struct ReadinessState(usize);
595
596/// Returned by `dequeue_node`. Represents the different states as described by
597/// the queue documentation on 1024cores.net.
598enum Dequeue {
599    Data(*mut ReadinessNode),
600    Empty,
601    Inconsistent,
602}
603
604const AWAKEN: Token = Token(usize::MAX);
605const MAX_REFCOUNT: usize = (isize::MAX) as usize;
606
607/*
608 *
609 * ===== Poll =====
610 *
611 */
612
613impl Poll {
614    /// Return a new `Poll` handle.
615    ///
616    /// This function will make a syscall to the operating system to create the
617    /// system selector. If this syscall fails, `Poll::new` will return with the
618    /// error.
619    ///
620    /// See [struct] level docs for more details.
621    ///
622    /// [struct]: struct.Poll.html
623    ///
624    /// # Examples
625    ///
626    /// ```
627    /// # use std::error::Error;
628    /// # fn try_main() -> Result<(), Box<dyn Error>> {
629    /// use corcovado::{Poll, Events};
630    /// use std::time::Duration;
631    ///
632    /// let poll = match Poll::new() {
633    ///     Ok(poll) => poll,
634    ///     Err(e) => panic!("failed to create Poll instance; err={:?}", e),
635    /// };
636    ///
637    /// // Create a structure to receive polled events
638    /// let mut events = Events::with_capacity(1024);
639    ///
640    /// // Wait for events, but none will be received because no `Evented`
641    /// // handles have been registered with this `Poll` instance.
642    /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
643    /// assert_eq!(n, 0);
644    /// #     Ok(())
645    /// # }
646    /// #
647    /// # fn main() {
648    /// #     try_main().unwrap();
649    /// # }
650    /// ```
651    pub fn new() -> io::Result<Poll> {
652        is_send::<Poll>();
653        is_sync::<Poll>();
654
655        let poll = Poll {
656            selector: sys::Selector::new()?,
657            readiness_queue: ReadinessQueue::new()?,
658            lock_state: AtomicUsize::new(0),
659            lock: Mutex::new(()),
660            condvar: Condvar::new(),
661        };
662
663        // Register the notification wakeup FD with the IO poller
664        poll.readiness_queue.inner.awakener.register(
665            &poll,
666            AWAKEN,
667            Ready::readable(),
668            PollOpt::edge(),
669        )?;
670
671        Ok(poll)
672    }
673
674    /// Register an `Evented` handle with the `Poll` instance.
675    ///
676    /// Once registered, the `Poll` instance will monitor the `Evented` handle
677    /// for readiness state changes. When it notices a state change, it will
678    /// return a readiness event for the handle the next time [`poll`] is
679    /// called.
680    ///
681    /// See the [`struct`] docs for a high level overview.
682    ///
683    /// # Arguments
684    ///
685    /// `handle: &E: Evented`: This is the handle that the `Poll` instance
686    /// should monitor for readiness state changes.
687    ///
688    /// `token: Token`: The caller picks a token to associate with the socket.
689    /// When [`poll`] returns an event for the handle, this token is included.
690    /// This allows the caller to map the event to its handle. The token
691    /// associated with the `Evented` handle can be changed at any time by
692    /// calling [`reregister`].
693    ///
694    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
695    /// usage.
696    ///
697    /// See documentation on [`Token`] for an example showing how to pick
698    /// [`Token`] values.
699    ///
700    /// `interest: Ready`: Specifies which operations `Poll` should monitor for
701    /// readiness. `Poll` will only return readiness events for operations
702    /// specified by this argument.
703    ///
704    /// If a socket is registered with readable interest and the socket becomes
705    /// writable, no event will be returned from [`poll`].
706    ///
707    /// The readiness interest for an `Evented` handle can be changed at any
708    /// time by calling [`reregister`].
709    ///
710    /// `opts: PollOpt`: Specifies the registration options. The most common
711    /// options being [`level`] for level-triggered events, [`edge`] for
712    /// edge-triggered events, and [`oneshot`].
713    ///
714    /// The registration options for an `Evented` handle can be changed at any
715    /// time by calling [`reregister`].
716    ///
717    /// # Notes
718    ///
719    /// Unless otherwise specified, the caller should assume that once an
720    /// `Evented` handle is registered with a `Poll` instance, it is bound to
721    /// that `Poll` instance for the lifetime of the `Evented` handle. This
722    /// remains true even if the `Evented` handle is deregistered from the poll
723    /// instance using [`deregister`].
724    ///
725    /// This function is **thread safe**. It can be called concurrently from
726    /// multiple threads.
727    ///
728    /// [`struct`]: #
729    /// [`reregister`]: #method.reregister
730    /// [`deregister`]: #method.deregister
731    /// [`poll`]: #method.poll
732    /// [`level`]: struct.PollOpt.html#method.level
733    /// [`edge`]: struct.PollOpt.html#method.edge
734    /// [`oneshot`]: struct.PollOpt.html#method.oneshot
735    /// [`Token`]: struct.Token.html
736    ///
737    // # Examples
738    //
739    // ```
740    // # use std::error::Error;
741    // # fn try_main() -> Result<(), Box<dyn Error>> {
742    // use corcovado::{Events, Poll, Ready, PollOpt, Token};
743    // use std::net::TcpStream;
744    // use std::time::{Duration, Instant};
745    //
746    // let poll = Poll::new()?;
747    // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748    //
749    // // Register the socket with `poll`
750    // poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751    //
752    // let mut events = Events::with_capacity(1024);
753    // let start = Instant::now();
754    // let timeout = Duration::from_millis(500);
755    //
756    // loop {
757    //     let elapsed = start.elapsed();
758    //
759    //     if elapsed >= timeout {
760    //         // Connection timed out
761    //         return Ok(());
762    //     }
763    //
764    //     let remaining = timeout - elapsed;
765    //     poll.poll(&mut events, Some(remaining))?;
766    //
767    //     for event in &events {
768    //         if event.token() == Token(0) {
769    //             // Something (probably) happened on the socket.
770    //             return Ok(());
771    //         }
772    //     }
773    // }
774    // #     Ok(())
775    // # }
776    // #
777    // # fn main() {
778    // #     try_main().unwrap();
779    // # }
780    // ```
781    pub fn register<E>(
782        &self,
783        handle: &E,
784        token: Token,
785        interest: Ready,
786        opts: PollOpt,
787    ) -> io::Result<()>
788    where
789        E: Evented + ?Sized,
790    {
791        validate_args(token)?;
792
793        /*
794         * Undefined behavior:
795         * - Reusing a token with a different `Evented` without deregistering
796         * (or closing) the original `Evented`.
797         */
798        trace!("registering with poller");
799
800        // Register interests for this socket
801        handle.register(self, token, interest, opts)?;
802
803        Ok(())
804    }
805
806    /// Re-register an `Evented` handle with the `Poll` instance.
807    ///
808    /// Re-registering an `Evented` handle allows changing the details of the
809    /// registration. Specifically, it allows updating the associated `token`,
810    /// `interest`, and `opts` specified in previous `register` and `reregister`
811    /// calls.
812    ///
813    /// The `reregister` arguments fully override the previous values. In other
814    /// words, if a socket is registered with [`readable`] interest and the call
815    /// to `reregister` specifies [`writable`], then read interest is no longer
816    /// requested for the handle.
817    ///
818    /// The `Evented` handle must have previously been registered with this
819    /// instance of `Poll` otherwise the call to `reregister` will return with
820    /// an error.
821    ///
822    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823    /// usage.
824    ///
825    /// See the [`register`] documentation for details about the function
826    /// arguments and see the [`struct`] docs for a high level overview of
827    /// polling.
828    ///
829    // # Examples
830    //
831    // ```
832    // # use std::error::Error;
833    // # fn try_main() -> Result<(), Box<dyn Error>> {
834    // use corcovado::{Poll, Ready, PollOpt, Token};
835    // use std::net::TcpStream;
836    //
837    // let poll = Poll::new()?;
838    // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
839    //
840    // // Register the socket with `poll`, requesting readable
841    // poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
842    //
843    // // Reregister the socket specifying a different token and write interest
844    // // instead. `PollOpt::edge()` must be specified even though that value
845    // // is not being changed.
846    // poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
847    // #     Ok(())
848    // # }
849    // #
850    // # fn main() {
851    // #     try_main().unwrap();
852    // # }
853    // ```
854    //
855    // [`struct`]: #
856    // [`register`]: #method.register
857    // [`readable`]: struct.Ready.html#method.readable
858    // [`writable`]: struct.Ready.html#method.writable
859    pub fn reregister<E>(
860        &self,
861        handle: &E,
862        token: Token,
863        interest: Ready,
864        opts: PollOpt,
865    ) -> io::Result<()>
866    where
867        E: Evented + ?Sized,
868    {
869        validate_args(token)?;
870
871        trace!("registering with poller");
872
873        // Register interests for this socket
874        handle.reregister(self, token, interest, opts)?;
875
876        Ok(())
877    }
878
879    /// Deregister an `Evented` handle with the `Poll` instance.
880    ///
881    /// When an `Evented` handle is deregistered, the `Poll` instance will
882    /// no longer monitor it for readiness state changes. Unlike disabling
883    /// handles with oneshot, deregistering clears up any internal resources
884    /// needed to track the handle.
885    ///
886    /// A handle can be passed back to `register` after it has been
887    /// deregistered; however, it must be passed back to the **same** `Poll`
888    /// instance.
889    ///
890    /// `Evented` handles are automatically deregistered when they are dropped.
891    /// It is common to never need to explicitly call `deregister`.
892    ///
893    // # Examples
894    //
895    // ```
896    // # use std::error::Error;
897    // # fn try_main() -> Result<(), Box<dyn Error>> {
898    // use corcovado::{Events, Poll, Ready, PollOpt, Token};
899    // use std::net::TcpStream;
900    // use std::time::Duration;
901    //
902    // let poll = Poll::new()?;
903    // let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
904    //
905    // // Register the socket with `poll`
906    // poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
907    //
908    // poll.deregister(&socket)?;
909    //
910    // let mut events = Events::with_capacity(1024);
911    //
912    // // Set a timeout because this poll should never receive any events.
913    // let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
914    // assert_eq!(0, n);
915    // #     Ok(())
916    // # }
917    // #
918    // # fn main() {
919    // #     try_main().unwrap();
920    // # }
921    // ```
922    pub fn deregister<E>(&self, handle: &E) -> io::Result<()>
923    where
924        E: Evented + ?Sized,
925    {
926        trace!("deregistering handle with poller");
927
928        // Deregister interests for this socket
929        handle.deregister(self)?;
930
931        Ok(())
932    }
933
934    /// Wait for readiness events
935    ///
936    /// Blocks the current thread and waits for readiness events for any of the
937    /// `Evented` handles that have been registered with this `Poll` instance.
938    /// The function will block until either at least one readiness event has
939    /// been received or `timeout` has elapsed. A `timeout` of `None` means that
940    /// `poll` will block until a readiness event has been received.
941    ///
942    /// The supplied `events` will be cleared and newly received readiness events
943    /// will be pushed onto the end. At most `events.capacity()` events will be
944    /// returned. If there are further pending readiness events, they will be
945    /// returned on the next call to `poll`.
946    ///
947    /// A single call to `poll` may result in multiple readiness events being
948    /// returned for a single `Evented` handle. For example, if a TCP socket
949    /// becomes both readable and writable, it may be possible for a single
950    /// readiness event to be returned with both [`readable`] and [`writable`]
951    /// readiness **OR** two separate events may be returned, one with
952    /// [`readable`] set and one with [`writable`] set.
953    ///
954    /// Note that the `timeout` will be rounded up to the system clock
955    /// granularity (usually 1ms), and kernel scheduling delays mean that
956    /// the blocking interval may be overrun by a small amount.
957    ///
958    /// `poll` returns the number of readiness events that have been pushed into
959    /// `events` or `Err` when an error has been encountered with the system
960    /// selector.  The value returned is deprecated and will be removed in 0.7.0.
961    /// Accessing the events by index is also deprecated.  Events can be
962    /// inserted by other events triggering, thus making sequential access
963    /// problematic.  Use the iterator API instead.  See [`iter`].
964    ///
965    /// See the [struct] level documentation for a higher level discussion of
966    /// polling.
967    ///
968    /// [`readable`]: struct.Ready.html#method.readable
969    /// [`writable`]: struct.Ready.html#method.writable
970    /// [struct]: #
971    /// [`iter`]: struct.Events.html#method.iter
972    ///
973    // # Examples
974    //
975    // A basic example -- establishing a `TcpStream` connection.
976    //
977    // ```
978    // # use std::error::Error;
979    // # fn try_main() -> Result<(), Box<dyn Error>> {
980    // use corcovado::{Events, Poll, Ready, PollOpt, Token};
981    //
982    // use std::net::{TcpStream, TcpListener, SocketAddr};
983    // use std::thread;
984    //
985    // // Bind a server socket to connect to.
986    // let addr: SocketAddr = "127.0.0.1:0".parse()?;
987    // let server = TcpListener::bind(&addr)?;
988    // let addr = server.local_addr()?.clone();
989    //
990    // // Spawn a thread to accept the socket
991    // thread::spawn(move || {
992    //     let _ = server.accept();
993    // });
994    //
995    // // Construct a new `Poll` handle as well as the `Events` we'll store into
996    // let poll = Poll::new()?;
997    // let mut events = Events::with_capacity(1024);
998    //
999    // // Connect the stream
1000    // let stream = TcpStream::connect(&addr)?;
1001    //
1002    // // Register the stream with `Poll`
1003    // poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1004    //
1005    // // Wait for the socket to become ready. This has to happens in a loop to
1006    // // handle spurious wakeups.
1007    // loop {
1008    //     poll.poll(&mut events, None)?;
1009    //
1010    //     for event in &events {
1011    //         if event.token() == Token(0) && event.readiness().is_writable() {
1012    //             // The socket connected (probably, it could still be a spurious
1013    //             // wakeup)
1014    //             return Ok(());
1015    //         }
1016    //     }
1017    // }
1018    // #     Ok(())
1019    // # }
1020    // #
1021    // # fn main() {
1022    // #     try_main().unwrap();
1023    // # }
1024    // ```
1025    ///
1026    /// [struct]: #
1027    pub fn poll(
1028        &self,
1029        events: &mut Events,
1030        timeout: Option<Duration>,
1031    ) -> io::Result<usize> {
1032        self.poll1(events, timeout, false)
1033    }
1034
1035    /// Like `poll`, but may be interrupted by a signal
1036    ///
1037    /// If `poll` is inturrupted while blocking, it will transparently retry the syscall.  If you
1038    /// want to handle signals yourself, however, use `poll_interruptible`.
1039    pub fn poll_interruptible(
1040        &self,
1041        events: &mut Events,
1042        timeout: Option<Duration>,
1043    ) -> io::Result<usize> {
1044        self.poll1(events, timeout, true)
1045    }
1046
1047    fn poll1(
1048        &self,
1049        events: &mut Events,
1050        mut timeout: Option<Duration>,
1051        interruptible: bool,
1052    ) -> io::Result<usize> {
1053        let zero = Some(Duration::from_millis(0));
1054
1055        // At a high level, the synchronization strategy is to acquire access to
1056        // the critical section by transitioning the atomic from unlocked ->
1057        // locked. If the attempt fails, the thread will wait on the condition
1058        // variable.
1059        //
1060        // # Some more detail
1061        //
1062        // The `lock_state` atomic usize combines:
1063        //
1064        // - locked flag, stored in the least significant bit
1065        // - number of waiting threads, stored in the rest of the bits.
1066        //
1067        // When a thread transitions the locked flag from 0 -> 1, it has
1068        // obtained access to the critical section.
1069        //
1070        // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1071        // This is a fast path for the case when there are no concurrent calls
1072        // to poll, which is very common.
1073        //
1074        // On failure, the mutex is locked, and the thread attempts to increment
1075        // the number of waiting threads component of `lock_state`. If this is
1076        // successfully done while the locked flag is set, then the thread can
1077        // wait on the condition variable.
1078        //
1079        // When a thread exits the critical section, it unsets the locked flag.
1080        // If there are any waiters, which is atomically determined while
1081        // unsetting the locked flag, then the condvar is notified.
1082
1083        #[allow(deprecated)]
1084        let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1085
1086        if 0 != curr {
1087            // Enter slower path
1088            let mut lock = self.lock.lock().unwrap();
1089            let mut inc = false;
1090
1091            loop {
1092                if curr & 1 == 0 {
1093                    // The lock is currently free, attempt to grab it
1094                    let mut next = curr | 1;
1095
1096                    if inc {
1097                        // The waiter count has previously been incremented, so
1098                        // decrement it here
1099                        next -= 2;
1100                    }
1101
1102                    #[allow(deprecated)]
1103                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1104
1105                    if actual != curr {
1106                        curr = actual;
1107                        continue;
1108                    }
1109
1110                    // Lock acquired, break from the loop
1111                    break;
1112                }
1113
1114                if timeout == zero {
1115                    if inc {
1116                        self.lock_state.fetch_sub(2, SeqCst);
1117                    }
1118
1119                    return Ok(0);
1120                }
1121
1122                // The lock is currently held, so wait for it to become
1123                // free. If the waiter count hasn't been incremented yet, do
1124                // so now
1125                if !inc {
1126                    let next = curr.checked_add(2).expect("overflow");
1127                    #[allow(deprecated)]
1128                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1129
1130                    if actual != curr {
1131                        curr = actual;
1132                        continue;
1133                    }
1134
1135                    // Track that the waiter count has been incremented for
1136                    // this thread and fall through to the condvar waiting
1137                    inc = true;
1138                }
1139
1140                lock = match timeout {
1141                    Some(to) => {
1142                        let now = Instant::now();
1143
1144                        // Wait to be notified
1145                        let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1146
1147                        // See how much time was elapsed in the wait
1148                        let elapsed = now.elapsed();
1149
1150                        // Update `timeout` to reflect how much time is left to
1151                        // wait.
1152                        if elapsed >= to {
1153                            timeout = zero;
1154                        } else {
1155                            // Update the timeout
1156                            timeout = Some(to - elapsed);
1157                        }
1158
1159                        l
1160                    }
1161                    None => self.condvar.wait(lock).unwrap(),
1162                };
1163
1164                // Reload the state
1165                curr = self.lock_state.load(SeqCst);
1166
1167                // Try to lock again...
1168            }
1169        }
1170
1171        let ret = self.poll2(events, timeout, interruptible);
1172
1173        // Release the lock
1174        if 1 != self.lock_state.fetch_and(!1, Release) {
1175            // Acquire the mutex
1176            let _lock = self.lock.lock().unwrap();
1177
1178            // There is at least one waiting thread, so notify one
1179            self.condvar.notify_one();
1180        }
1181
1182        ret
1183    }
1184
1185    #[inline]
1186    #[allow(clippy::if_same_then_else)]
1187    fn poll2(
1188        &self,
1189        events: &mut Events,
1190        mut timeout: Option<Duration>,
1191        interruptible: bool,
1192    ) -> io::Result<usize> {
1193        // Compute the timeout value passed to the system selector. If the
1194        // readiness queue has pending nodes, we still want to poll the system
1195        // selector for new events, but we don't want to block the thread to
1196        // wait for new events.
1197        if timeout == Some(Duration::from_millis(0)) {
1198            // If blocking is not requested, then there is no need to prepare
1199            // the queue for sleep
1200            //
1201            // The sleep_marker should be removed by readiness_queue.poll().
1202        } else if self.readiness_queue.prepare_for_sleep() {
1203            // The readiness queue is empty. The call to `prepare_for_sleep`
1204            // inserts `sleep_marker` into the queue. This signals to any
1205            // threads setting readiness that the `Poll::poll` is going to
1206            // sleep, so the awakener should be used.
1207        } else {
1208            // The readiness queue is not empty, so do not block the thread.
1209            timeout = Some(Duration::from_millis(0));
1210        }
1211
1212        loop {
1213            let now = Instant::now();
1214            // First get selector events
1215            let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1216            match res {
1217                Ok(true) => {
1218                    // Some awakeners require reading from a FD.
1219                    self.readiness_queue.inner.awakener.cleanup();
1220                    break;
1221                }
1222                Ok(false) => break,
1223                Err(ref e)
1224                    if e.kind() == io::ErrorKind::Interrupted && !interruptible =>
1225                {
1226                    // Interrupted by a signal; update timeout if necessary and retry
1227                    if let Some(to) = timeout {
1228                        let elapsed = now.elapsed();
1229                        if elapsed >= to {
1230                            break;
1231                        } else {
1232                            timeout = Some(to - elapsed);
1233                        }
1234                    }
1235                }
1236                Err(e) => return Err(e),
1237            }
1238        }
1239
1240        // Poll custom event queue
1241        self.readiness_queue.poll(&mut events.inner);
1242
1243        // Return number of polled events
1244        Ok(events.inner.len())
1245    }
1246}
1247
1248fn validate_args(token: Token) -> io::Result<()> {
1249    if token == AWAKEN {
1250        return Err(io::Error::other("invalid token"));
1251    }
1252
1253    Ok(())
1254}
1255
1256impl fmt::Debug for Poll {
1257    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1258        fmt.debug_struct("Poll").finish()
1259    }
1260}
1261
1262#[cfg(all(unix, not(target_os = "fuchsia")))]
1263impl AsRawFd for Poll {
1264    fn as_raw_fd(&self) -> RawFd {
1265        self.selector.as_raw_fd()
1266    }
1267}
1268
1269/// A collection of readiness events.
1270///
1271/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1272/// receive any new readiness events received since the last poll. Usually, a
1273/// single `Events` instance is created at the same time as a [`Poll`] and
1274/// reused on each call to [`Poll::poll`].
1275///
1276/// See [`Poll`] for more documentation on polling.
1277///
1278/// # Examples
1279///
1280/// ```
1281/// # use std::error::Error;
1282/// # fn try_main() -> Result<(), Box<dyn Error>> {
1283/// use corcovado::{Events, Poll};
1284/// use std::time::Duration;
1285///
1286/// let mut events = Events::with_capacity(1024);
1287/// let poll = Poll::new()?;
1288///
1289/// assert_eq!(0, events.len());
1290///
1291/// // Register `Evented` handles with `poll`
1292///
1293/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1294///
1295/// for event in &events {
1296///     println!("event={:?}", event);
1297/// }
1298/// #     Ok(())
1299/// # }
1300/// #
1301/// # fn main() {
1302/// #     try_main().unwrap();
1303/// # }
1304/// ```
1305///
1306/// [`Poll::poll`]: struct.Poll.html#method.poll
1307/// [`Poll`]: struct.Poll.html
1308pub struct Events {
1309    inner: sys::Events,
1310}
1311
1312/// [`Events`] iterator.
1313///
1314/// This struct is created by the [`iter`] method on [`Events`].
1315///
1316/// # Examples
1317///
1318/// ```
1319/// # use std::error::Error;
1320/// # fn try_main() -> Result<(), Box<dyn Error>> {
1321/// use corcovado::{Events, Poll};
1322/// use std::time::Duration;
1323///
1324/// let mut events = Events::with_capacity(1024);
1325/// let poll = Poll::new()?;
1326///
1327/// // Register handles with `poll`
1328///
1329/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1330///
1331/// for event in events.iter() {
1332///     println!("event={:?}", event);
1333/// }
1334/// #     Ok(())
1335/// # }
1336/// #
1337/// # fn main() {
1338/// #     try_main().unwrap();
1339/// # }
1340/// ```
1341///
1342/// [`Events`]: struct.Events.html
1343/// [`iter`]: struct.Events.html#method.iter
1344#[derive(Debug, Clone)]
1345pub struct Iter<'a> {
1346    inner: &'a Events,
1347    pos: usize,
1348}
1349
1350/// Owned [`Events`] iterator.
1351///
1352/// This struct is created by the `into_iter` method on [`Events`].
1353///
1354/// # Examples
1355///
1356/// ```
1357/// # use std::error::Error;
1358/// # fn try_main() -> Result<(), Box<dyn Error>> {
1359/// use corcovado::{Events, Poll};
1360/// use std::time::Duration;
1361///
1362/// let mut events = Events::with_capacity(1024);
1363/// let poll = Poll::new()?;
1364///
1365/// // Register handles with `poll`
1366///
1367/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1368///
1369/// for event in events {
1370///     println!("event={:?}", event);
1371/// }
1372/// #     Ok(())
1373/// # }
1374/// #
1375/// # fn main() {
1376/// #     try_main().unwrap();
1377/// # }
1378/// ```
1379/// [`Events`]: struct.Events.html
1380#[derive(Debug)]
1381pub struct IntoIter {
1382    inner: Events,
1383    pos: usize,
1384}
1385
1386impl Events {
1387    /// Return a new `Events` capable of holding up to `capacity` events.
1388    ///
1389    /// # Examples
1390    ///
1391    /// ```
1392    /// use corcovado::Events;
1393    ///
1394    /// let events = Events::with_capacity(1024);
1395    ///
1396    /// assert_eq!(1024, events.capacity());
1397    /// ```
1398    pub fn with_capacity(capacity: usize) -> Events {
1399        Events {
1400            inner: sys::Events::with_capacity(capacity),
1401        }
1402    }
1403
1404    pub fn get(&self, idx: usize) -> Option<Event> {
1405        self.inner.get(idx)
1406    }
1407
1408    pub fn len(&self) -> usize {
1409        self.inner.len()
1410    }
1411
1412    /// Returns the number of `Event` values that `self` can hold.
1413    ///
1414    /// ```
1415    /// use corcovado::Events;
1416    ///
1417    /// let events = Events::with_capacity(1024);
1418    ///
1419    /// assert_eq!(1024, events.capacity());
1420    /// ```
1421    pub fn capacity(&self) -> usize {
1422        self.inner.capacity()
1423    }
1424
1425    /// Returns `true` if `self` contains no `Event` values.
1426    ///
1427    /// # Examples
1428    ///
1429    /// ```
1430    /// use corcovado::Events;
1431    ///
1432    /// let events = Events::with_capacity(1024);
1433    ///
1434    /// assert!(events.is_empty());
1435    /// ```
1436    pub fn is_empty(&self) -> bool {
1437        self.inner.is_empty()
1438    }
1439
1440    /// Returns an iterator over the `Event` values.
1441    ///
1442    /// # Examples
1443    ///
1444    /// ```
1445    /// # use std::error::Error;
1446    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1447    /// use corcovado::{Events, Poll};
1448    /// use std::time::Duration;
1449    ///
1450    /// let mut events = Events::with_capacity(1024);
1451    /// let poll = Poll::new()?;
1452    ///
1453    /// // Register handles with `poll`
1454    ///
1455    /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1456    ///
1457    /// for event in events.iter() {
1458    ///     println!("event={:?}", event);
1459    /// }
1460    /// #     Ok(())
1461    /// # }
1462    /// #
1463    /// # fn main() {
1464    /// #     try_main().unwrap();
1465    /// # }
1466    /// ```
1467    pub fn iter(&self) -> Iter<'_> {
1468        Iter {
1469            inner: self,
1470            pos: 0,
1471        }
1472    }
1473
1474    /// Clearing all `Event` values from container explicitly.
1475    ///
1476    /// # Examples
1477    ///
1478    /// ```
1479    /// # use std::error::Error;
1480    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1481    /// use corcovado::{Events, Poll};
1482    /// use std::time::Duration;
1483    ///
1484    /// let mut events = Events::with_capacity(1024);
1485    /// let poll = Poll::new()?;
1486    ///
1487    /// // Register handles with `poll`
1488    /// for _ in 0..2 {
1489    ///     events.clear();
1490    ///     poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1491    ///
1492    ///     for event in events.iter() {
1493    ///         println!("event={:?}", event);
1494    ///     }
1495    /// }
1496    /// #     Ok(())
1497    /// # }
1498    /// #
1499    /// # fn main() {
1500    /// #     try_main().unwrap();
1501    /// # }
1502    /// ```
1503    pub fn clear(&mut self) {
1504        self.inner.clear();
1505    }
1506}
1507
1508impl<'a> IntoIterator for &'a Events {
1509    type Item = Event;
1510    type IntoIter = Iter<'a>;
1511
1512    fn into_iter(self) -> Self::IntoIter {
1513        self.iter()
1514    }
1515}
1516
1517impl Iterator for Iter<'_> {
1518    type Item = Event;
1519
1520    fn next(&mut self) -> Option<Event> {
1521        let ret = self.inner.inner.get(self.pos);
1522        self.pos += 1;
1523        ret
1524    }
1525}
1526
1527impl IntoIterator for Events {
1528    type Item = Event;
1529    type IntoIter = IntoIter;
1530
1531    fn into_iter(self) -> Self::IntoIter {
1532        IntoIter {
1533            inner: self,
1534            pos: 0,
1535        }
1536    }
1537}
1538
1539impl Iterator for IntoIter {
1540    type Item = Event;
1541
1542    fn next(&mut self) -> Option<Event> {
1543        let ret = self.inner.inner.get(self.pos);
1544        self.pos += 1;
1545        ret
1546    }
1547}
1548
1549impl fmt::Debug for Events {
1550    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1551        f.debug_struct("Events")
1552            .field("capacity", &self.capacity())
1553            .finish()
1554    }
1555}
1556
1557// ===== Accessors for internal usage =====
1558
1559pub fn selector(poll: &Poll) -> &sys::Selector {
1560    &poll.selector
1561}
1562
1563/*
1564 *
1565 * ===== Registration =====
1566 *
1567 */
1568
1569// TODO: get rid of this, windows depends on it for now
1570#[allow(dead_code)]
1571pub fn new_registration(
1572    poll: &Poll,
1573    token: Token,
1574    ready: Ready,
1575    opt: PollOpt,
1576) -> (Registration, SetReadiness) {
1577    Registration::new_priv(poll, token, ready, opt)
1578}
1579
1580impl Registration {
1581    /// Create and return a new `Registration` and the associated
1582    /// `SetReadiness`.
1583    ///
1584    /// See [struct] documentation for more detail and [`Poll`]
1585    /// for high level documentation on polling.
1586    ///
1587    /// # Examples
1588    ///
1589    /// ```
1590    /// # use std::error::Error;
1591    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1592    /// use corcovado::{Events, Ready, Registration, Poll, PollOpt, Token};
1593    /// use std::thread;
1594    ///
1595    /// let (registration, set_readiness) = Registration::new2();
1596    ///
1597    /// thread::spawn(move || {
1598    ///     use std::time::Duration;
1599    ///     thread::sleep(Duration::from_millis(500));
1600    ///
1601    ///     set_readiness.set_readiness(Ready::readable());
1602    /// });
1603    ///
1604    /// let poll = Poll::new()?;
1605    /// poll.register(&registration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1606    ///
1607    /// let mut events = Events::with_capacity(256);
1608    ///
1609    /// loop {
1610    ///     poll.poll(&mut events, None);
1611    ///
1612    ///     for event in &events {
1613    ///         if event.token() == Token(0) && event.readiness().is_readable() {
1614    ///             return Ok(());
1615    ///         }
1616    ///     }
1617    /// }
1618    /// #     Ok(())
1619    /// # }
1620    /// #
1621    /// # fn main() {
1622    /// #     try_main().unwrap();
1623    /// # }
1624    /// ```
1625    /// [struct]: #
1626    /// [`Poll`]: struct.Poll.html
1627    pub fn new2() -> (Registration, SetReadiness) {
1628        // Allocate the registration node. The new node will have `ref_count`
1629        // set to 2: one SetReadiness, one Registration.
1630        let node = Box::into_raw(Box::new(ReadinessNode::new(
1631            ptr::null_mut(),
1632            Token(0),
1633            Ready::empty(),
1634            PollOpt::empty(),
1635            2,
1636        )));
1637
1638        let registration = Registration {
1639            inner: RegistrationInner { node },
1640        };
1641
1642        let set_readiness = SetReadiness {
1643            inner: RegistrationInner { node },
1644        };
1645
1646        (registration, set_readiness)
1647    }
1648
1649    pub fn new(
1650        poll: &Poll,
1651        token: Token,
1652        interest: Ready,
1653        opt: PollOpt,
1654    ) -> (Registration, SetReadiness) {
1655        Registration::new_priv(poll, token, interest, opt)
1656    }
1657
1658    // TODO: Get rid of this (windows depends on it for now)
1659    fn new_priv(
1660        poll: &Poll,
1661        token: Token,
1662        interest: Ready,
1663        opt: PollOpt,
1664    ) -> (Registration, SetReadiness) {
1665        is_send::<Registration>();
1666        is_sync::<Registration>();
1667        is_send::<SetReadiness>();
1668        is_sync::<SetReadiness>();
1669
1670        // Clone handle to the readiness queue, this bumps the ref count
1671        let queue = poll.readiness_queue.inner.clone();
1672
1673        // Convert to a *mut () pointer
1674        let queue: *mut () = unsafe { mem::transmute(queue) };
1675
1676        // Allocate the registration node. The new node will have `ref_count`
1677        // set to 3: one SetReadiness, one Registration, and one Poll handle.
1678        let node =
1679            Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1680
1681        let registration = Registration {
1682            inner: RegistrationInner { node },
1683        };
1684
1685        let set_readiness = SetReadiness {
1686            inner: RegistrationInner { node },
1687        };
1688
1689        (registration, set_readiness)
1690    }
1691
1692    pub fn update(
1693        &self,
1694        poll: &Poll,
1695        token: Token,
1696        interest: Ready,
1697        opts: PollOpt,
1698    ) -> io::Result<()> {
1699        self.inner.update(poll, token, interest, opts)
1700    }
1701
1702    pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1703        self.inner
1704            .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1705    }
1706}
1707
1708impl Evented for Registration {
1709    fn register(
1710        &self,
1711        poll: &Poll,
1712        token: Token,
1713        interest: Ready,
1714        opts: PollOpt,
1715    ) -> io::Result<()> {
1716        self.inner.update(poll, token, interest, opts)
1717    }
1718
1719    fn reregister(
1720        &self,
1721        poll: &Poll,
1722        token: Token,
1723        interest: Ready,
1724        opts: PollOpt,
1725    ) -> io::Result<()> {
1726        self.inner.update(poll, token, interest, opts)
1727    }
1728
1729    fn deregister(&self, poll: &Poll) -> io::Result<()> {
1730        self.inner
1731            .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1732    }
1733}
1734
1735impl Drop for Registration {
1736    fn drop(&mut self) {
1737        // `flag_as_dropped` toggles the `dropped` flag and notifies
1738        // `Poll::poll` to release its handle (which is just decrementing
1739        // the ref count).
1740        if self.inner.state.flag_as_dropped() {
1741            // Can't do anything if the queuing fails
1742            let _ = self.inner.enqueue_with_wakeup();
1743        }
1744    }
1745}
1746
1747impl fmt::Debug for Registration {
1748    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1749        fmt.debug_struct("Registration").finish()
1750    }
1751}
1752
1753impl SetReadiness {
1754    /// Returns the registration's current readiness.
1755    ///
1756    /// # Note
1757    ///
1758    /// There is no guarantee that `readiness` establishes any sort of memory
1759    /// ordering. Any concurrent data access must be synchronized using another
1760    /// strategy.
1761    ///
1762    /// # Examples
1763    ///
1764    /// ```
1765    /// # use std::error::Error;
1766    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1767    /// use corcovado::{Registration, Ready};
1768    ///
1769    /// let (registration, set_readiness) = Registration::new2();
1770    ///
1771    /// assert!(set_readiness.readiness().is_empty());
1772    ///
1773    /// set_readiness.set_readiness(Ready::readable())?;
1774    /// assert!(set_readiness.readiness().is_readable());
1775    /// #     Ok(())
1776    /// # }
1777    /// #
1778    /// # fn main() {
1779    /// #     try_main().unwrap();
1780    /// # }
1781    /// ```
1782    pub fn readiness(&self) -> Ready {
1783        self.inner.readiness()
1784    }
1785
1786    /// Set the registration's readiness
1787    ///
1788    /// If the associated `Registration` is registered with a [`Poll`] instance
1789    /// and has requested readiness events that include `ready`, then a future
1790    /// call to [`Poll::poll`] will receive a readiness event representing the
1791    /// readiness state change.
1792    ///
1793    /// # Note
1794    ///
1795    /// There is no guarantee that `readiness` establishes any sort of memory
1796    /// ordering. Any concurrent data access must be synchronized using another
1797    /// strategy.
1798    ///
1799    /// There is also no guarantee as to when the readiness event will be
1800    /// delivered to poll. A best attempt will be made to make the delivery in a
1801    /// "timely" fashion. For example, the following is **not** guaranteed to
1802    /// work:
1803    ///
1804    /// ```
1805    /// # use std::error::Error;
1806    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1807    /// use corcovado::{Events, Registration, Ready, Poll, PollOpt, Token};
1808    ///
1809    /// let poll = Poll::new()?;
1810    /// let (registration, set_readiness) = Registration::new2();
1811    ///
1812    /// poll.register(&registration,
1813    ///               Token(0),
1814    ///               Ready::readable(),
1815    ///               PollOpt::edge())?;
1816    ///
1817    /// // Set the readiness, then immediately poll to try to get the readiness
1818    /// // event
1819    /// set_readiness.set_readiness(Ready::readable())?;
1820    ///
1821    /// let mut events = Events::with_capacity(1024);
1822    /// poll.poll(&mut events, None)?;
1823    ///
1824    /// // There is NO guarantee that the following will work. It is possible
1825    /// // that the readiness event will be delivered at a later time.
1826    /// let event = events.get(0).unwrap();
1827    /// assert_eq!(event.token(), Token(0));
1828    /// assert!(event.readiness().is_readable());
1829    /// #     Ok(())
1830    /// # }
1831    /// #
1832    /// # fn main() {
1833    /// #     try_main().unwrap();
1834    /// # }
1835    /// ```
1836    ///
1837    /// # Examples
1838    ///
1839    /// A simple example, for a more elaborate example, see the [`Evented`]
1840    /// documentation.
1841    ///
1842    /// ```
1843    /// # use std::error::Error;
1844    /// # fn try_main() -> Result<(), Box<dyn Error>> {
1845    /// use corcovado::{Registration, Ready};
1846    ///
1847    /// let (registration, set_readiness) = Registration::new2();
1848    ///
1849    /// assert!(set_readiness.readiness().is_empty());
1850    ///
1851    /// set_readiness.set_readiness(Ready::readable())?;
1852    /// assert!(set_readiness.readiness().is_readable());
1853    /// #     Ok(())
1854    /// # }
1855    /// #
1856    /// # fn main() {
1857    /// #     try_main().unwrap();
1858    /// # }
1859    /// ```
1860    ///
1861    /// [`Registration`]: struct.Registration.html
1862    /// [`Evented`]: event/trait.Evented.html#examples
1863    /// [`Poll`]: struct.Poll.html
1864    /// [`Poll::poll`]: struct.Poll.html#method.poll
1865    pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1866        self.inner.set_readiness(ready)
1867    }
1868}
1869
1870impl fmt::Debug for SetReadiness {
1871    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1872        f.debug_struct("SetReadiness").finish()
1873    }
1874}
1875
1876impl RegistrationInner {
1877    /// Get the registration's readiness.
1878    fn readiness(&self) -> Ready {
1879        self.state.load(Relaxed).readiness()
1880    }
1881
1882    /// Set the registration's readiness.
1883    ///
1884    /// This function can be called concurrently by an arbitrary number of
1885    /// SetReadiness handles.
1886    fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1887        // Load the current atomic state.
1888        let mut state = self.state.load(Acquire);
1889        let mut next;
1890
1891        loop {
1892            next = state;
1893
1894            if state.is_dropped() {
1895                // Node is dropped, no more notifications
1896                return Ok(());
1897            }
1898
1899            // Update the readiness
1900            next.set_readiness(ready);
1901
1902            // If the readiness is not blank, try to obtain permission to
1903            // push the node into the readiness queue.
1904            if !next.effective_readiness().is_empty() {
1905                next.set_queued();
1906            }
1907
1908            let actual = self.state.compare_and_swap(state, next, AcqRel);
1909
1910            if state == actual {
1911                break;
1912            }
1913
1914            state = actual;
1915        }
1916
1917        if !state.is_queued() && next.is_queued() {
1918            // We toggled the queued flag, making us responsible for queuing the
1919            // node in the MPSC readiness queue.
1920            self.enqueue_with_wakeup()?;
1921        }
1922
1923        Ok(())
1924    }
1925
1926    /// Update the registration details associated with the node
1927    fn update(
1928        &self,
1929        poll: &Poll,
1930        token: Token,
1931        interest: Ready,
1932        opt: PollOpt,
1933    ) -> io::Result<()> {
1934        // First, ensure poll instances match
1935        //
1936        // Load the queue pointer, `Relaxed` is sufficient here as only the
1937        // pointer is being operated on. The actual memory is guaranteed to be
1938        // visible the `poll: &Poll` ref passed as an argument to the function.
1939        let mut queue = self.readiness_queue.load(Relaxed);
1940        let other: &*mut () =
1941            unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1942        let other = *other;
1943
1944        debug_assert!(
1945            mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>()
1946        );
1947
1948        if queue.is_null() {
1949            // Attempt to set the queue pointer. `Release` ordering synchronizes
1950            // with `Acquire` in `ensure_with_wakeup`.
1951            #[allow(deprecated)]
1952            let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1953
1954            if actual.is_null() {
1955                // The CAS succeeded, this means that the node's ref count
1956                // should be incremented to reflect that the `poll` function
1957                // effectively owns the node as well.
1958                //
1959                // `Relaxed` ordering used for the same reason as in
1960                // RegistrationInner::clone
1961                self.ref_count.fetch_add(1, Relaxed);
1962
1963                // Note that the `queue` reference stored in our
1964                // `readiness_queue` field is intended to be a strong reference,
1965                // so now that we've successfully claimed the reference we bump
1966                // the refcount here.
1967                //
1968                // Down below in `release_node` when we deallocate this
1969                // `RegistrationInner` is where we'll transmute this back to an
1970                // arc and decrement the reference count.
1971                mem::forget(poll.readiness_queue.clone());
1972            } else {
1973                // The CAS failed, another thread set the queue pointer, so ensure
1974                // that the pointer and `other` match
1975                if actual != other {
1976                    return Err(io::Error::other(
1977                        "registration handle associated with another `Poll` instance",
1978                    ));
1979                }
1980            }
1981
1982            queue = other;
1983        } else if queue != other {
1984            return Err(io::Error::other(
1985                "registration handle associated with another `Poll` instance",
1986            ));
1987        }
1988
1989        unsafe {
1990            let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1991            debug_assert_eq!(queue as usize, *actual);
1992        }
1993
1994        // The `update_lock` atomic is used as a flag ensuring only a single
1995        // thread concurrently enters the `update` critical section. Any
1996        // concurrent calls to update are discarded. If coordinated updates are
1997        // required, the Mio user is responsible for handling that.
1998        //
1999        // Acquire / Release ordering is used on `update_lock` to ensure that
2000        // data access to the `token_*` variables are scoped to the critical
2001        // section.
2002
2003        // Acquire the update lock.
2004        #[allow(deprecated)]
2005        if self.update_lock.compare_and_swap(false, true, Acquire) {
2006            // The lock is already held. Discard the update
2007            return Ok(());
2008        }
2009
2010        // Relaxed ordering is acceptable here as the only memory that needs to
2011        // be visible as part of the update are the `token_*` variables, and
2012        // ordering has already been handled by the `update_lock` access.
2013        let mut state = self.state.load(Relaxed);
2014        let mut next;
2015
2016        // Read the current token, again this memory has been ordered by the
2017        // acquire on `update_lock`.
2018        let curr_token_pos = state.token_write_pos();
2019        let curr_token = unsafe { self::token(self, curr_token_pos) };
2020
2021        let mut next_token_pos = curr_token_pos;
2022
2023        // If the `update` call is changing the token, then compute the next
2024        // available token slot and write the token there.
2025        //
2026        // Note that this computation is happening *outside* of the
2027        // compare-and-swap loop. The update lock ensures that only a single
2028        // thread could be mutating the write_token_position, so the
2029        // `next_token_pos` will never need to be recomputed even if
2030        // `token_read_pos` concurrently changes. This is because
2031        // `token_read_pos` can ONLY concurrently change to the current value of
2032        // `token_write_pos`, so `next_token_pos` will always remain valid.
2033        if token != curr_token {
2034            next_token_pos = state.next_token_pos();
2035
2036            // Update the token
2037            match next_token_pos {
2038                0 => unsafe { *self.token_0.get() = token },
2039                1 => unsafe { *self.token_1.get() = token },
2040                2 => unsafe { *self.token_2.get() = token },
2041                _ => unreachable!(),
2042            }
2043        }
2044
2045        // Now enter the compare-and-swap loop
2046        loop {
2047            next = state;
2048
2049            // The node is only dropped once all `Registration` handles are
2050            // dropped. Only `Registration` can call `update`.
2051            debug_assert!(!state.is_dropped());
2052
2053            // Update the write token position, this will also release the token
2054            // to Poll::poll.
2055            next.set_token_write_pos(next_token_pos);
2056
2057            // Update readiness and poll opts
2058            next.set_interest(interest);
2059            next.set_poll_opt(opt);
2060
2061            // If there is effective readiness, the node will need to be queued
2062            // for processing. This exact behavior is still TBD, so we are
2063            // conservative for now and always fire.
2064            //
2065            // See https://github.com/carllerche/mio/issues/535.
2066            if !next.effective_readiness().is_empty() {
2067                next.set_queued();
2068            }
2069
2070            // compare-and-swap the state values. Only `Release` is needed here.
2071            // The `Release` ensures that `Poll::poll` will see the token
2072            // update and the update function doesn't care about any other
2073            // memory visibility.
2074            let actual = self.state.compare_and_swap(state, next, Release);
2075
2076            if actual == state {
2077                break;
2078            }
2079
2080            // CAS failed, but `curr_token_pos` should not have changed given
2081            // that we still hold the update lock.
2082            debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2083
2084            state = actual;
2085        }
2086
2087        // Release the lock
2088        self.update_lock.store(false, Release);
2089
2090        if !state.is_queued() && next.is_queued() {
2091            // We are responsible for enqueuing the node.
2092            enqueue_with_wakeup(queue, self)?;
2093        }
2094
2095        Ok(())
2096    }
2097}
2098
2099impl ops::Deref for RegistrationInner {
2100    type Target = ReadinessNode;
2101
2102    fn deref(&self) -> &ReadinessNode {
2103        unsafe { &*self.node }
2104    }
2105}
2106
2107impl Clone for RegistrationInner {
2108    fn clone(&self) -> RegistrationInner {
2109        // Using a relaxed ordering is alright here, as knowledge of the
2110        // original reference prevents other threads from erroneously deleting
2111        // the object.
2112        //
2113        // As explained in the [Boost documentation][1], Increasing the
2114        // reference counter can always be done with memory_order_relaxed: New
2115        // references to an object can only be formed from an existing
2116        // reference, and passing an existing reference from one thread to
2117        // another must already provide any required synchronization.
2118        //
2119        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2120        let old_size = self.ref_count.fetch_add(1, Relaxed);
2121
2122        // However we need to guard against massive refcounts in case someone
2123        // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2124        // and users will use-after free. We racily saturate to `isize::MAX` on
2125        // the assumption that there aren't ~2 billion threads incrementing
2126        // the reference count at once. This branch will never be taken in
2127        // any realistic program.
2128        //
2129        // We abort because such a program is incredibly degenerate, and we
2130        // don't care to support it.
2131        if old_size & !MAX_REFCOUNT != 0 {
2132            process::abort();
2133        }
2134
2135        RegistrationInner { node: self.node }
2136    }
2137}
2138
2139impl Drop for RegistrationInner {
2140    fn drop(&mut self) {
2141        // Only handles releasing from `Registration` and `SetReadiness`
2142        // handles. Poll has to call this itself.
2143        release_node(self.node);
2144    }
2145}
2146
2147/*
2148 *
2149 * ===== ReadinessQueue =====
2150 *
2151 */
2152
2153impl ReadinessQueue {
2154    /// Create a new `ReadinessQueue`.
2155    fn new() -> io::Result<ReadinessQueue> {
2156        is_send::<Self>();
2157        is_sync::<Self>();
2158
2159        let end_marker = Box::new(ReadinessNode::marker());
2160        let sleep_marker = Box::new(ReadinessNode::marker());
2161        let closed_marker = Box::new(ReadinessNode::marker());
2162
2163        let ptr = &*end_marker as *const _ as *mut _;
2164
2165        Ok(ReadinessQueue {
2166            inner: Rc::new(ReadinessQueueInner {
2167                awakener: sys::Awakener::new()?,
2168                head_readiness: AtomicPtr::new(ptr),
2169                tail_readiness: UnsafeCell::new(ptr),
2170                end_marker,
2171                sleep_marker,
2172                closed_marker,
2173            }),
2174        })
2175    }
2176
2177    /// Poll the queue for new events
2178    fn poll(&self, dst: &mut sys::Events) {
2179        // `until` is set with the first node that gets re-enqueued due to being
2180        // set to have level-triggered notifications. This prevents an infinite
2181        // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2182        let mut until = ptr::null_mut();
2183
2184        if dst.len() == dst.capacity() {
2185            // If `dst` is already full, the readiness queue won't be drained.
2186            // This might result in `sleep_marker` staying in the queue and
2187            // unnecessary pipe writes occurring.
2188            self.inner.clear_sleep_marker();
2189        }
2190
2191        'outer: while dst.len() < dst.capacity() {
2192            // Dequeue a node. If the queue is in an inconsistent state, then
2193            // stop polling. `Poll::poll` will be called again shortly and enter
2194            // a syscall, which should be enough to enable the other thread to
2195            // finish the queuing process.
2196            let ptr = match unsafe { self.inner.dequeue_node(until) } {
2197                Dequeue::Empty | Dequeue::Inconsistent => break,
2198                Dequeue::Data(ptr) => ptr,
2199            };
2200
2201            let node = unsafe { &*ptr };
2202
2203            // Read the node state with Acquire ordering. This allows reading
2204            // the token variables.
2205            let mut state = node.state.load(Acquire);
2206            let mut next;
2207            let mut readiness;
2208            let mut opt;
2209
2210            loop {
2211                // Build up any changes to the readiness node's state and
2212                // attempt the CAS at the end
2213                next = state;
2214
2215                // Given that the node was just read from the queue, the
2216                // `queued` flag should still be set.
2217                debug_assert!(state.is_queued());
2218
2219                // The dropped flag means we need to release the node and
2220                // perform no further processing on it.
2221                if state.is_dropped() {
2222                    // Release the node and continue
2223                    release_node(ptr);
2224                    continue 'outer;
2225                }
2226
2227                // Process the node
2228                readiness = state.effective_readiness();
2229                opt = state.poll_opt();
2230
2231                if opt.is_edge() {
2232                    // Mark the node as dequeued
2233                    next.set_dequeued();
2234
2235                    if opt.is_oneshot() && !readiness.is_empty() {
2236                        next.disarm();
2237                    }
2238                } else if readiness.is_empty() {
2239                    next.set_dequeued();
2240                }
2241
2242                // Ensure `token_read_pos` is set to `token_write_pos` so that
2243                // we read the most up to date token value.
2244                next.update_token_read_pos();
2245
2246                if state == next {
2247                    break;
2248                }
2249
2250                let actual = node.state.compare_and_swap(state, next, AcqRel);
2251
2252                if actual == state {
2253                    break;
2254                }
2255
2256                state = actual;
2257            }
2258
2259            // If the queued flag is still set, then the node must be requeued.
2260            // This typically happens when using level-triggered notifications.
2261            if next.is_queued() {
2262                if until.is_null() {
2263                    // We never want to see the node again
2264                    until = ptr;
2265                }
2266
2267                // Requeue the node
2268                self.inner.enqueue_node(node);
2269            }
2270
2271            if !readiness.is_empty() {
2272                // Get the token
2273                let token = unsafe { token(node, next.token_read_pos()) };
2274
2275                // Push the event
2276                dst.push_event(Event::new(readiness, token));
2277            }
2278        }
2279    }
2280
2281    /// Prepare the queue for the `Poll::poll` thread to block in the system
2282    /// selector. This involves changing `head_readiness` to `sleep_marker`.
2283    /// Returns true if successful and `poll` can block.
2284    fn prepare_for_sleep(&self) -> bool {
2285        let end_marker = self.inner.end_marker();
2286        let sleep_marker = self.inner.sleep_marker();
2287
2288        let tail = unsafe { *self.inner.tail_readiness.get() };
2289
2290        // If the tail is currently set to the sleep_marker, then check if the
2291        // head is as well. If it is, then the queue is currently ready to
2292        // sleep. If it is not, then the queue is not empty and there should be
2293        // no sleeping.
2294        if tail == sleep_marker {
2295            return self.inner.head_readiness.load(Acquire) == sleep_marker;
2296        }
2297
2298        // If the tail is not currently set to `end_marker`, then the queue is
2299        // not empty.
2300        if tail != end_marker {
2301            return false;
2302        }
2303
2304        // The sleep marker is *not* currently in the readiness queue.
2305        //
2306        // The sleep marker is only inserted in this function. It is also only
2307        // inserted in the tail position. This is guaranteed by first checking
2308        // that the end marker is in the tail position, pushing the sleep marker
2309        // after the end marker, then removing the end marker.
2310        //
2311        // Before inserting a node into the queue, the next pointer has to be
2312        // set to null. Again, this is only safe to do when the node is not
2313        // currently in the queue, but we already have ensured this.
2314        self.inner
2315            .sleep_marker
2316            .next_readiness
2317            .store(ptr::null_mut(), Relaxed);
2318
2319        #[allow(deprecated)]
2320        let actual =
2321            self.inner
2322                .head_readiness
2323                .compare_and_swap(end_marker, sleep_marker, AcqRel);
2324
2325        debug_assert!(actual != sleep_marker);
2326
2327        if actual != end_marker {
2328            // The readiness queue is not empty
2329            return false;
2330        }
2331
2332        // The current tail should be pointing to `end_marker`
2333        debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2334        // The `end_marker` next pointer should be null
2335        debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2336
2337        // Update tail pointer.
2338        unsafe {
2339            *self.inner.tail_readiness.get() = sleep_marker;
2340        }
2341        true
2342    }
2343}
2344
2345impl Drop for ReadinessQueue {
2346    fn drop(&mut self) {
2347        // Close the queue by enqueuing the closed node
2348        self.inner.enqueue_node(&self.inner.closed_marker);
2349
2350        loop {
2351            // Free any nodes that happen to be left in the readiness queue
2352            let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2353                Dequeue::Empty => break,
2354                Dequeue::Inconsistent => {
2355                    // This really shouldn't be possible as all other handles to
2356                    // `ReadinessQueueInner` are dropped, but handle this by
2357                    // spinning I guess?
2358                    continue;
2359                }
2360                Dequeue::Data(ptr) => ptr,
2361            };
2362
2363            let node = unsafe { &*ptr };
2364
2365            let state = node.state.load(Acquire);
2366
2367            debug_assert!(state.is_queued());
2368
2369            release_node(ptr);
2370        }
2371    }
2372}
2373
2374impl ReadinessQueueInner {
2375    fn wakeup(&self) -> io::Result<()> {
2376        self.awakener.wakeup()
2377    }
2378
2379    /// Prepend the given node to the head of the readiness queue. This is done
2380    /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2381    fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2382        if self.enqueue_node(node) {
2383            self.wakeup()?;
2384        }
2385
2386        Ok(())
2387    }
2388
2389    /// Push the node into the readiness queue
2390    fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2391        // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2392        let node_ptr = node as *const _ as *mut _;
2393
2394        // Relaxed used as the ordering is "released" when swapping
2395        // `head_readiness`
2396        node.next_readiness.store(ptr::null_mut(), Relaxed);
2397
2398        unsafe {
2399            let mut prev = self.head_readiness.load(Acquire);
2400
2401            loop {
2402                if prev == self.closed_marker() {
2403                    debug_assert!(node_ptr != self.closed_marker());
2404                    // debug_assert!(node_ptr != self.end_marker());
2405                    debug_assert!(node_ptr != self.sleep_marker());
2406
2407                    if node_ptr != self.end_marker() {
2408                        // The readiness queue is shutdown, but the enqueue flag was
2409                        // set. This means that we are responsible for decrementing
2410                        // the ready queue's ref count
2411                        debug_assert!(node.ref_count.load(Relaxed) >= 2);
2412                        release_node(node_ptr);
2413                    }
2414
2415                    return false;
2416                }
2417
2418                #[allow(deprecated)]
2419                let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2420
2421                if prev == act {
2422                    break;
2423                }
2424
2425                prev = act;
2426            }
2427
2428            debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2429
2430            (*prev).next_readiness.store(node_ptr, Release);
2431
2432            prev == self.sleep_marker()
2433        }
2434    }
2435
2436    fn clear_sleep_marker(&self) {
2437        let end_marker = self.end_marker();
2438        let sleep_marker = self.sleep_marker();
2439
2440        unsafe {
2441            let tail = *self.tail_readiness.get();
2442
2443            if tail != self.sleep_marker() {
2444                return;
2445            }
2446
2447            // The empty markeer is *not* currently in the readiness queue
2448            // (since the sleep markeris).
2449            self.end_marker
2450                .next_readiness
2451                .store(ptr::null_mut(), Relaxed);
2452
2453            #[allow(deprecated)]
2454            let actual =
2455                self.head_readiness
2456                    .compare_and_swap(sleep_marker, end_marker, AcqRel);
2457
2458            debug_assert!(actual != end_marker);
2459
2460            if actual != sleep_marker {
2461                // The readiness queue is not empty, we cannot remove the sleep
2462                // markeer
2463                return;
2464            }
2465
2466            // Update the tail pointer.
2467            *self.tail_readiness.get() = end_marker;
2468        }
2469    }
2470
2471    /// Must only be called in `poll` or `drop`
2472    unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2473        // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2474        // with the modifications mentioned at the top of the file.
2475        let mut tail = *self.tail_readiness.get();
2476        let mut next = (*tail).next_readiness.load(Acquire);
2477
2478        if tail == self.end_marker()
2479            || tail == self.sleep_marker()
2480            || tail == self.closed_marker()
2481        {
2482            if next.is_null() {
2483                // Make sure the sleep marker is removed (as we are no longer
2484                // sleeping
2485                self.clear_sleep_marker();
2486
2487                return Dequeue::Empty;
2488            }
2489
2490            *self.tail_readiness.get() = next;
2491            tail = next;
2492            next = (*next).next_readiness.load(Acquire);
2493        }
2494
2495        // Only need to check `until` at this point. `until` is either null,
2496        // which will never match tail OR it is a node that was pushed by
2497        // the current thread. This means that either:
2498        //
2499        // 1) The queue is inconsistent, which is handled explicitly
2500        // 2) We encounter `until` at this point in dequeue
2501        // 3) we will pop a different node
2502        if tail == until {
2503            return Dequeue::Empty;
2504        }
2505
2506        if !next.is_null() {
2507            *self.tail_readiness.get() = next;
2508            return Dequeue::Data(tail);
2509        }
2510
2511        if self.head_readiness.load(Acquire) != tail {
2512            return Dequeue::Inconsistent;
2513        }
2514
2515        // Push the stub node
2516        self.enqueue_node(&self.end_marker);
2517
2518        next = (*tail).next_readiness.load(Acquire);
2519
2520        if !next.is_null() {
2521            *self.tail_readiness.get() = next;
2522            return Dequeue::Data(tail);
2523        }
2524
2525        Dequeue::Inconsistent
2526    }
2527
2528    fn end_marker(&self) -> *mut ReadinessNode {
2529        &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2530    }
2531
2532    fn sleep_marker(&self) -> *mut ReadinessNode {
2533        &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2534    }
2535
2536    fn closed_marker(&self) -> *mut ReadinessNode {
2537        &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2538    }
2539}
2540
2541impl ReadinessNode {
2542    /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2543    fn new(
2544        queue: *mut (),
2545        token: Token,
2546        interest: Ready,
2547        opt: PollOpt,
2548        ref_count: usize,
2549    ) -> ReadinessNode {
2550        ReadinessNode {
2551            state: AtomicState::new(interest, opt),
2552            // Only the first token is set, the others are initialized to 0
2553            token_0: UnsafeCell::new(token),
2554            token_1: UnsafeCell::new(Token(0)),
2555            token_2: UnsafeCell::new(Token(0)),
2556            next_readiness: AtomicPtr::new(ptr::null_mut()),
2557            update_lock: AtomicBool::new(false),
2558            readiness_queue: AtomicPtr::new(queue),
2559            ref_count: AtomicUsize::new(ref_count),
2560        }
2561    }
2562
2563    fn marker() -> ReadinessNode {
2564        ReadinessNode {
2565            state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2566            token_0: UnsafeCell::new(Token(0)),
2567            token_1: UnsafeCell::new(Token(0)),
2568            token_2: UnsafeCell::new(Token(0)),
2569            next_readiness: AtomicPtr::new(ptr::null_mut()),
2570            update_lock: AtomicBool::new(false),
2571            readiness_queue: AtomicPtr::new(ptr::null_mut()),
2572            ref_count: AtomicUsize::new(0),
2573        }
2574    }
2575
2576    fn enqueue_with_wakeup(&self) -> io::Result<()> {
2577        let queue = self.readiness_queue.load(Acquire);
2578
2579        if queue.is_null() {
2580            // Not associated with a queue, nothing to do
2581            return Ok(());
2582        }
2583
2584        enqueue_with_wakeup(queue, self)
2585    }
2586}
2587
2588fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2589    debug_assert!(!queue.is_null());
2590    // This is ugly... but we don't want to bump the ref count.
2591    let queue: &Arc<ReadinessQueueInner> =
2592        unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2593    queue.enqueue_node_with_wakeup(node)
2594}
2595
2596unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2597    match pos {
2598        0 => *node.token_0.get(),
2599        1 => *node.token_1.get(),
2600        2 => *node.token_2.get(),
2601        _ => unreachable!(),
2602    }
2603}
2604
2605fn release_node(ptr: *mut ReadinessNode) {
2606    unsafe {
2607        // `AcqRel` synchronizes with other `release_node` functions and ensures
2608        // that the drop happens after any reads / writes on other threads.
2609        if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2610            return;
2611        }
2612
2613        let node = Box::from_raw(ptr);
2614
2615        // Decrement the readiness_queue Arc
2616        let queue = node.readiness_queue.load(Acquire);
2617
2618        if queue.is_null() {
2619            return;
2620        }
2621
2622        let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2623    }
2624}
2625
2626impl AtomicState {
2627    fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2628        let state = ReadinessState::new(interest, opt);
2629
2630        AtomicState {
2631            inner: AtomicUsize::new(state.into()),
2632        }
2633    }
2634
2635    /// Loads the current `ReadinessState`
2636    fn load(&self, order: Ordering) -> ReadinessState {
2637        self.inner.load(order).into()
2638    }
2639
2640    /// Stores a state if the current state is the same as `current`.
2641    fn compare_and_swap(
2642        &self,
2643        current: ReadinessState,
2644        new: ReadinessState,
2645        order: Ordering,
2646    ) -> ReadinessState {
2647        #[allow(deprecated)]
2648        self.inner
2649            .compare_and_swap(current.into(), new.into(), order)
2650            .into()
2651    }
2652
2653    // Returns `true` if the node should be queued
2654    fn flag_as_dropped(&self) -> bool {
2655        let prev: ReadinessState = self
2656            .inner
2657            .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2658            .into();
2659        // The flag should not have been previously set
2660        debug_assert!(!prev.is_dropped());
2661
2662        !prev.is_queued()
2663    }
2664}
2665
2666impl ReadinessState {
2667    // Create a `ReadinessState` initialized with the provided arguments
2668    #[inline]
2669    fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2670        let interest = event::ready_as_usize(interest);
2671        let opt = event::opt_as_usize(opt);
2672
2673        debug_assert!(interest <= MASK_4);
2674        debug_assert!(opt <= MASK_4);
2675
2676        let mut val = interest << INTEREST_SHIFT;
2677        val |= opt << POLL_OPT_SHIFT;
2678
2679        ReadinessState(val)
2680    }
2681
2682    #[inline]
2683    fn get(self, mask: usize, shift: usize) -> usize {
2684        (self.0 >> shift) & mask
2685    }
2686
2687    #[inline]
2688    fn set(&mut self, val: usize, mask: usize, shift: usize) {
2689        self.0 = (self.0 & !(mask << shift)) | (val << shift)
2690    }
2691
2692    /// Get the readiness
2693    #[inline]
2694    fn readiness(self) -> Ready {
2695        let v = self.get(MASK_4, READINESS_SHIFT);
2696        event::ready_from_usize(v)
2697    }
2698
2699    #[inline]
2700    fn effective_readiness(self) -> Ready {
2701        self.readiness() & self.interest()
2702    }
2703
2704    /// Set the readiness
2705    #[inline]
2706    fn set_readiness(&mut self, v: Ready) {
2707        self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2708    }
2709
2710    /// Get the interest
2711    #[inline]
2712    fn interest(self) -> Ready {
2713        let v = self.get(MASK_4, INTEREST_SHIFT);
2714        event::ready_from_usize(v)
2715    }
2716
2717    /// Set the interest
2718    #[inline]
2719    fn set_interest(&mut self, v: Ready) {
2720        self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2721    }
2722
2723    #[inline]
2724    fn disarm(&mut self) {
2725        self.set_interest(Ready::empty());
2726    }
2727
2728    /// Get the poll options
2729    #[inline]
2730    fn poll_opt(self) -> PollOpt {
2731        let v = self.get(MASK_4, POLL_OPT_SHIFT);
2732        event::opt_from_usize(v)
2733    }
2734
2735    /// Set the poll options
2736    #[inline]
2737    fn set_poll_opt(&mut self, v: PollOpt) {
2738        self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2739    }
2740
2741    #[inline]
2742    fn is_queued(self) -> bool {
2743        self.0 & QUEUED_MASK == QUEUED_MASK
2744    }
2745
2746    /// Set the queued flag
2747    #[inline]
2748    fn set_queued(&mut self) {
2749        // Dropped nodes should never be queued
2750        debug_assert!(!self.is_dropped());
2751        self.0 |= QUEUED_MASK;
2752    }
2753
2754    #[inline]
2755    fn set_dequeued(&mut self) {
2756        debug_assert!(self.is_queued());
2757        self.0 &= !QUEUED_MASK
2758    }
2759
2760    #[inline]
2761    fn is_dropped(self) -> bool {
2762        self.0 & DROPPED_MASK == DROPPED_MASK
2763    }
2764
2765    #[inline]
2766    fn token_read_pos(self) -> usize {
2767        self.get(MASK_2, TOKEN_RD_SHIFT)
2768    }
2769
2770    #[inline]
2771    fn token_write_pos(self) -> usize {
2772        self.get(MASK_2, TOKEN_WR_SHIFT)
2773    }
2774
2775    #[inline]
2776    fn next_token_pos(self) -> usize {
2777        let rd = self.token_read_pos();
2778        let wr = self.token_write_pos();
2779
2780        match wr {
2781            0 => match rd {
2782                1 => 2,
2783                2 => 1,
2784                0 => 1,
2785                _ => unreachable!(),
2786            },
2787            1 => match rd {
2788                0 => 2,
2789                2 => 0,
2790                1 => 2,
2791                _ => unreachable!(),
2792            },
2793            2 => match rd {
2794                0 => 1,
2795                1 => 0,
2796                2 => 0,
2797                _ => unreachable!(),
2798            },
2799            _ => unreachable!(),
2800        }
2801    }
2802
2803    #[inline]
2804    fn set_token_write_pos(&mut self, val: usize) {
2805        self.set(val, MASK_2, TOKEN_WR_SHIFT);
2806    }
2807
2808    #[inline]
2809    fn update_token_read_pos(&mut self) {
2810        let val = self.token_write_pos();
2811        self.set(val, MASK_2, TOKEN_RD_SHIFT);
2812    }
2813}
2814
2815impl From<ReadinessState> for usize {
2816    fn from(src: ReadinessState) -> usize {
2817        src.0
2818    }
2819}
2820
2821impl From<usize> for ReadinessState {
2822    fn from(src: usize) -> ReadinessState {
2823        ReadinessState(src)
2824    }
2825}
2826
2827fn is_send<T: Send>() {}
2828fn is_sync<T: Sync>() {}
2829
2830impl SelectorId {
2831    #[allow(unused)]
2832    pub fn new() -> SelectorId {
2833        SelectorId {
2834            id: AtomicUsize::new(0),
2835        }
2836    }
2837
2838    #[allow(unused)]
2839    pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2840        let selector_id = self.id.load(Ordering::SeqCst);
2841
2842        if selector_id != 0 && selector_id != poll.selector.id() {
2843            Err(io::Error::other("socket already registered"))
2844        } else {
2845            self.id.store(poll.selector.id(), Ordering::SeqCst);
2846            Ok(())
2847        }
2848    }
2849}
2850
2851impl Clone for SelectorId {
2852    fn clone(&self) -> SelectorId {
2853        SelectorId {
2854            id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2855        }
2856    }
2857}
2858
2859#[test]
2860#[cfg(all(unix, not(target_os = "fuchsia")))]
2861pub fn as_raw_fd() {
2862    let poll = Poll::new().unwrap();
2863    assert!(poll.as_raw_fd() > 0);
2864}