retty_io/
poll.rs

1use event_imp::{self as event, Event, Evented, PollOpt, Ready};
2use std::cell::UnsafeCell;
3#[cfg(all(unix, not(target_os = "fuchsia")))]
4use std::os::unix::io::AsRawFd;
5#[cfg(all(unix, not(target_os = "fuchsia")))]
6use std::os::unix::io::RawFd;
7use std::process;
8use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
9use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
10use std::sync::{Arc, Condvar, Mutex};
11use std::time::{Duration, Instant};
12use std::{fmt, io, ptr, usize};
13use std::{isize, mem, ops};
14use {sys, Token};
15
16// Poll is backed by two readiness queues. The first is a system readiness queue
17// represented by `sys::Selector`. The system readiness queue handles events
18// provided by the system, such as TCP and UDP. The second readiness queue is
19// implemented in user space by `ReadinessQueue`. It provides a way to implement
20// purely user space `Evented` types.
21//
22// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
23// list nodes. This significantly reduces the number of required allocations.
24// Each `Registration` / `SetReadiness` pair allocates a single readiness node
25// that is used for the lifetime of the registration.
26//
27// The readiness node also includes a single atomic variable, `state` that
28// tracks most of the state associated with the registration. This includes the
29// current readiness, interest, poll options, and internal state. When the node
30// state is mutated, it is queued in the MPSC channel. A call to
31// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
32// still be mutated while it is queued in the channel for processing.
33// Intermediate state values do not matter as long as the final state is
34// included in the call to `poll`. This is the eventually consistent nature of
35// the readiness queue.
36//
37// The readiness node is ref counted using the `ref_count` field. On creation,
38// the ref_count is initialized to 3: one `Registration` handle, one
39// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
40// doesn't *always* hold a handle to the node, we don't use the Arc type for
41// managing ref counts (this is to avoid constantly incrementing and
42// decrementing the ref count when pushing & popping from the queue). When the
43// `Registration` handle is dropped, the `dropped` flag is set on the node, then
44// the node is pushed into the registration queue. When Poll::poll pops the
45// node, it sees the drop flag is set, and decrements it's ref count.
46//
47// The MPSC queue is a modified version of the intrusive MPSC node based queue
48// described by 1024cores [1].
49//
50// The first modification is that two markers are used instead of a single
51// `stub`. The second marker is a `sleep_marker` which is used to signal to
52// producers that the consumer is going to sleep. This sleep_marker is only used
53// when the queue is empty, implying that the only node in the queue is
54// `end_marker`.
55//
56// The second modification is an `until` argument passed to the dequeue
57// function. When `poll` encounters a level-triggered node, the node will be
58// immediately pushed back into the queue. In order to avoid an infinite loop,
59// `poll` before pushing the node, the pointer is saved off and then passed
60// again as the `until` argument. If the next node to pop is `until`, then
61// `Dequeue::Empty` is returned.
62//
63// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
64
65/// Polls for readiness events on all registered values.
66///
67/// `Poll` allows a program to monitor a large number of `Evented` types,
68/// waiting until one or more become "ready" for some class of operations; e.g.
69/// reading and writing. An `Evented` type is considered ready if it is possible
70/// to immediately perform a corresponding operation; e.g. [`read`] or
71/// [`write`].
72///
73/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
74/// instance using the [`register`] method, supplying readiness interest. The
75/// readiness interest tells `Poll` which specific operations on the handle to
76/// monitor for readiness. A `Token` is also passed to the [`register`]
77/// function. When `Poll` returns a readiness event, it will include this token.
78/// This associates the event with the `Evented` handle that generated the
79/// event.
80///
81/// [`read`]: tcp/struct.TcpStream.html#method.read
82/// [`write`]: tcp/struct.TcpStream.html#method.write
83/// [`register`]: #method.register
84///
85/// # Examples
86///
87/// A basic example -- establishing a `TcpStream` connection.
88///
89/// ```
90/// # use std::error::Error;
91/// # fn try_main() -> Result<(), Box<Error>> {
92/// use retty_io::{Events, Poll, Ready, PollOpt, Token};
93/// use retty_io::net::TcpStream;
94///
95/// use std::net::{TcpListener, SocketAddr};
96///
97/// // Bind a server socket to connect to.
98/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
99/// let server = TcpListener::bind(&addr)?;
100///
101/// // Construct a new `Poll` handle as well as the `Events` we'll store into
102/// let poll = Poll::new()?;
103/// let mut events = Events::with_capacity(1024);
104///
105/// // Connect the stream
106/// let stream = TcpStream::connect(&server.local_addr()?)?;
107///
108/// // Register the stream with `Poll`
109/// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
110///
111/// // Wait for the socket to become ready. This has to happens in a loop to
112/// // handle spurious wakeups.
113/// loop {
114///     poll.poll(&mut events, None)?;
115///
116///     for event in &events {
117///         if event.token() == Token(0) && event.readiness().is_writable() {
118///             // The socket connected (probably, it could still be a spurious
119///             // wakeup)
120///             return Ok(());
121///         }
122///     }
123/// }
124/// #     Ok(())
125/// # }
126/// #
127/// # fn main() {
128/// #     try_main().unwrap();
129/// # }
130/// ```
131///
132/// # Edge-triggered and level-triggered
133///
134/// An [`Evented`] registration may request edge-triggered events or
135/// level-triggered events. This is done by setting `register`'s
136/// [`PollOpt`] argument to either [`edge`] or [`level`].
137///
138/// The difference between the two can be described as follows. Supposed that
139/// this scenario happens:
140///
141/// 1. A [`TcpStream`] is registered with `Poll`.
142/// 2. The socket receives 2kb of data.
143/// 3. A call to [`Poll::poll`] returns the token associated with the socket
144///    indicating readable readiness.
145/// 4. 1kb is read from the socket.
146/// 5. Another call to [`Poll::poll`] is made.
147///
148/// If when the socket was registered with `Poll`, edge triggered events were
149/// requested, then the call to [`Poll::poll`] done in step **5** will
150/// (probably) hang despite there being another 1kb still present in the socket
151/// read buffer. The reason for this is that edge-triggered mode delivers events
152/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
153/// caller might end up waiting for some data that is already present inside the
154/// socket buffer.
155///
156/// With edge-triggered events, operations **must** be performed on the
157/// `Evented` type until [`WouldBlock`] is returned. In other words, after
158/// receiving an event indicating readiness for a certain operation, one should
159/// assume that [`Poll::poll`] may never return another event for the same token
160/// and readiness until the operation returns [`WouldBlock`].
161///
162/// By contrast, when level-triggered notifications was requested, each call to
163/// [`Poll::poll`] will return an event for the socket as long as data remains
164/// in the socket buffer. Generally, level-triggered events should be avoided if
165/// high performance is a concern.
166///
167/// Since even with edge-triggered events, multiple events can be generated upon
168/// receipt of multiple chunks of data, the caller has the option to set the
169/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
170/// after the event is returned from [`Poll::poll`]. The subsequent calls to
171/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
172/// are disabled even if the readiness state changes. The handle can be
173/// re-enabled by calling [`reregister`]. When handles are disabled, internal
174/// resources used to monitor the handle are maintained until the handle is
175/// dropped or deregistered. This makes re-registering the handle a fast
176/// operation.
177///
178/// For example, in the following scenario:
179///
180/// 1. A [`TcpStream`] is registered with `Poll`.
181/// 2. The socket receives 2kb of data.
182/// 3. A call to [`Poll::poll`] returns the token associated with the socket
183///    indicating readable readiness.
184/// 4. 2kb is read from the socket.
185/// 5. Another call to read is issued and [`WouldBlock`] is returned
186/// 6. The socket receives another 2kb of data.
187/// 7. Another call to [`Poll::poll`] is made.
188///
189/// Assuming the socket was registered with `Poll` with the [`edge`] and
190/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
191/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
192/// returning an event.
193///
194/// In order to receive the event for the data received in step 6, the socket
195/// would need to be reregistered using [`reregister`].
196///
197/// [`PollOpt`]: struct.PollOpt.html
198/// [`edge`]: struct.PollOpt.html#method.edge
199/// [`level`]: struct.PollOpt.html#method.level
200/// [`Poll::poll`]: struct.Poll.html#method.poll
201/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
202/// [`Evented`]: event/trait.Evented.html
203/// [`TcpStream`]: tcp/struct.TcpStream.html
204/// [`reregister`]: #method.reregister
205/// [`oneshot`]: struct.PollOpt.html#method.oneshot
206///
207/// # Portability
208///
209/// Using `Poll` provides a portable interface across supported platforms as
210/// long as the caller takes the following into consideration:
211///
212/// ### Spurious events
213///
214/// [`Poll::poll`] may return readiness events even if the associated
215/// [`Evented`] handle is not actually ready. Given the same code, this may
216/// happen more on some platforms than others. It is important to never assume
217/// that, just because a readiness notification was received, that the
218/// associated operation will succeed as well.
219///
220/// If operation fails with [`WouldBlock`], then the caller should not treat
221/// this as an error, but instead should wait until another readiness event is
222/// received.
223///
224/// ### Draining readiness
225///
226/// When using edge-triggered mode, once a readiness event is received, the
227/// corresponding operation must be performed repeatedly until it returns
228/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
229/// readiness event will be delivered, even if further data is received for the
230/// [`Evented`] handle.
231///
232/// For example, in the first scenario described above, after step 5, even if
233/// the socket receives more data there is no guarantee that another readiness
234/// event will be delivered.
235///
236/// ### Readiness operations
237///
238/// The only readiness operations that are guaranteed to be present on all
239/// supported platforms are [`readable`] and [`writable`]. All other readiness
240/// operations may have false negatives and as such should be considered
241/// **hints**. This means that if a socket is registered with [`readable`],
242/// [`error`], and [`hup`] interest, and either an error or hup is received, a
243/// readiness event will be generated for the socket, but it **may** only
244/// include `readable` readiness. Also note that, given the potential for
245/// spurious events, receiving a readiness event with `hup` or `error` doesn't
246/// actually mean that a `read` on the socket will return a result matching the
247/// readiness event.
248///
249/// In other words, portable programs that explicitly check for [`hup`] or
250/// [`error`] readiness should be doing so as an **optimization** and always be
251/// able to handle an error or HUP situation when performing the actual read
252/// operation.
253///
254/// [`readable`]: struct.Ready.html#method.readable
255/// [`writable`]: struct.Ready.html#method.writable
256/// [`error`]: unix/struct.UnixReady.html#method.error
257/// [`hup`]: unix/struct.UnixReady.html#method.hup
258///
259/// ### Registering handles
260///
261/// Unless otherwise noted, it should be assumed that types implementing
262/// [`Evented`] will never become ready unless they are registered with `Poll`.
263///
264/// For example:
265///
266/// ```
267/// # use std::error::Error;
268/// # fn try_main() -> Result<(), Box<Error>> {
269/// use retty_io::{Poll, Ready, PollOpt, Token};
270/// use retty_io::net::TcpStream;
271/// use std::time::Duration;
272/// use std::thread;
273///
274/// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275///
276/// thread::sleep(Duration::from_secs(1));
277///
278/// let poll = Poll::new()?;
279///
280/// // The connect is not guaranteed to have started until it is registered at
281/// // this point
282/// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283/// #     Ok(())
284/// # }
285/// #
286/// # fn main() {
287/// #     try_main().unwrap();
288/// # }
289/// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// |      OS    |  Selector |
296/// |------------|-----------|
297/// | Linux      | [epoll]   |
298/// | OS X, iOS  | [kqueue]  |
299/// | Windows    | [IOCP]    |
300/// | FreeBSD    | [kqueue]  |
301/// | Android    | [epoll]   |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model retty-io's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
323/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
324/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
325/// [`EventedFd`]: unix/struct.EventedFd.html
326/// [`SetReadiness`]: struct.SetReadiness.html
327/// [`Poll::poll`]: struct.Poll.html#method.poll
328pub struct Poll {
329    // Platform specific IO selector
330    selector: sys::Selector,
331
332    // Custom readiness queue
333    readiness_queue: ReadinessQueue,
334
335    // Use an atomic to first check if a full lock will be required. This is a
336    // fast-path check for single threaded cases avoiding the extra syscall
337    lock_state: AtomicUsize,
338
339    // Sequences concurrent calls to `Poll::poll`
340    lock: Mutex<()>,
341
342    // Wakeup the next waiter
343    condvar: Condvar,
344}
345
346/// Handle to a user space `Poll` registration.
347///
348/// `Registration` allows implementing [`Evented`] for types that cannot work
349/// with the [system selector]. A `Registration` is always paired with a
350/// `SetReadiness`, which allows updating the registration's readiness state.
351/// When [`set_readiness`] is called and the `Registration` is associated with a
352/// [`Poll`] instance, a readiness event will be created and eventually returned
353/// by [`poll`].
354///
355/// A `Registration` / `SetReadiness` pair is created by calling
356/// [`Registration::new2`]. At this point, the registration is not being
357/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
358/// result in any readiness notifications.
359///
360/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
361/// the same [`register`], [`reregister`], and [`deregister`] functions used
362/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
363/// changes result in readiness events being dispatched to the [`Poll`] instance
364/// with which `Registration` is registered.
365///
366/// **Note**, before using `Registration` be sure to read the
367/// [`set_readiness`] documentation and the [portability] notes. The
368/// guarantees offered by `Registration` may be weaker than expected.
369///
370/// For high level documentation, see [`Poll`].
371///
372/// # Examples
373///
374/// ```
375/// use retty_io::{Ready, Registration, Poll, PollOpt, Token};
376/// use retty_io::event::Evented;
377///
378/// use std::io;
379/// use std::time::Instant;
380/// use std::thread;
381///
382/// pub struct Deadline {
383///     when: Instant,
384///     registration: Registration,
385/// }
386///
387/// impl Deadline {
388///     pub fn new(when: Instant) -> Deadline {
389///         let (registration, set_readiness) = Registration::new2();
390///
391///         thread::spawn(move || {
392///             let now = Instant::now();
393///
394///             if now < when {
395///                 thread::sleep(when - now);
396///             }
397///
398///             set_readiness.set_readiness(Ready::readable());
399///         });
400///
401///         Deadline {
402///             when: when,
403///             registration: registration,
404///         }
405///     }
406///
407///     pub fn is_elapsed(&self) -> bool {
408///         Instant::now() >= self.when
409///     }
410/// }
411///
412/// impl Evented for Deadline {
413///     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
414///         -> io::Result<()>
415///     {
416///         self.registration.register(poll, token, interest, opts)
417///     }
418///
419///     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
420///         -> io::Result<()>
421///     {
422///         self.registration.reregister(poll, token, interest, opts)
423///     }
424///
425///     fn deregister(&self, poll: &Poll) -> io::Result<()> {
426///         poll.deregister(&self.registration)
427///     }
428/// }
429/// ```
430///
431/// [system selector]: struct.Poll.html#implementation-notes
432/// [`Poll`]: struct.Poll.html
433/// [`Registration::new2`]: struct.Registration.html#method.new2
434/// [`Evented`]: event/trait.Evented.html
435/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
436/// [`register`]: struct.Poll.html#method.register
437/// [`reregister`]: struct.Poll.html#method.reregister
438/// [`deregister`]: struct.Poll.html#method.deregister
439/// [portability]: struct.Poll.html#portability
440pub struct Registration {
441    inner: RegistrationInner,
442}
443
444unsafe impl Send for Registration {}
445unsafe impl Sync for Registration {}
446
447/// Updates the readiness state of the associated `Registration`.
448///
449/// See [`Registration`] for more documentation on using `SetReadiness` and
450/// [`Poll`] for high level polling documentation.
451///
452/// [`Poll`]: struct.Poll.html
453/// [`Registration`]: struct.Registration.html
454#[derive(Clone)]
455pub struct SetReadiness {
456    inner: RegistrationInner,
457}
458
459unsafe impl Send for SetReadiness {}
460unsafe impl Sync for SetReadiness {}
461
462/// Used to associate an IO type with a Selector
463#[derive(Debug)]
464pub struct SelectorId {
465    id: AtomicUsize,
466}
467
468struct RegistrationInner {
469    // Unsafe pointer to the registration's node. The node is ref counted. This
470    // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
471    // handle though it isn't stored anywhere. In other words, `Poll::poll`
472    // needs to decrement the ref count before the node is freed.
473    node: *mut ReadinessNode,
474}
475
476#[derive(Clone)]
477struct ReadinessQueue {
478    inner: Arc<ReadinessQueueInner>,
479}
480
481unsafe impl Send for ReadinessQueue {}
482unsafe impl Sync for ReadinessQueue {}
483
484struct ReadinessQueueInner {
485    // Used to wake up `Poll` when readiness is set in another thread.
486    awakener: sys::Awakener,
487
488    // Head of the MPSC queue used to signal readiness to `Poll::poll`.
489    head_readiness: AtomicPtr<ReadinessNode>,
490
491    // Tail of the readiness queue.
492    //
493    // Only accessed by Poll::poll. Coordination will be handled by the poll fn
494    tail_readiness: UnsafeCell<*mut ReadinessNode>,
495
496    // Fake readiness node used to punctuate the end of the readiness queue.
497    // Before attempting to read from the queue, this node is inserted in order
498    // to partition the queue between nodes that are "owned" by the dequeue end
499    // and nodes that will be pushed on by producers.
500    end_marker: Box<ReadinessNode>,
501
502    // Similar to `end_marker`, but this node signals to producers that `Poll`
503    // has gone to sleep and must be woken up.
504    sleep_marker: Box<ReadinessNode>,
505
506    // Similar to `end_marker`, but the node signals that the queue is closed.
507    // This happens when `ReadyQueue` is dropped and signals to producers that
508    // the nodes should no longer be pushed into the queue.
509    closed_marker: Box<ReadinessNode>,
510}
511
512/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
513/// queued into the MPSC channel.
514struct ReadinessNode {
515    // Node state, see struct docs for `ReadinessState`
516    //
517    // This variable is the primary point of coordination between all the
518    // various threads concurrently accessing the node.
519    state: AtomicState,
520
521    // The registration token cannot fit into the `state` variable, so it is
522    // broken out here. In order to atomically update both the state and token
523    // we have to jump through a few hoops.
524    //
525    // First, `state` includes `token_read_pos` and `token_write_pos`. These can
526    // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
527    // the token slot that contains the most up to date registration token.
528    // `token_read_pos` is the token slot that `poll` is currently reading from.
529    //
530    // When a call to `update` includes a different token than the one currently
531    // associated with the registration (token_write_pos), first an unused token
532    // slot is found. The unused slot is the one not represented by
533    // `token_read_pos` OR `token_write_pos`. The new token is written to this
534    // slot, then `state` is updated with the new `token_write_pos` value. This
535    // requires that there is only a *single* concurrent call to `update`.
536    //
537    // When `poll` reads a node state, it checks that `token_read_pos` matches
538    // `token_write_pos`. If they do not match, then it atomically updates
539    // `state` such that `token_read_pos` is set to `token_write_pos`. It will
540    // then read the token at the newly updated `token_read_pos`.
541    token_0: UnsafeCell<Token>,
542    token_1: UnsafeCell<Token>,
543    token_2: UnsafeCell<Token>,
544
545    // Used when the node is queued in the readiness linked list. Accessing
546    // this field requires winning the "queue" lock
547    next_readiness: AtomicPtr<ReadinessNode>,
548
549    // Ensures that there is only one concurrent call to `update`.
550    //
551    // Each call to `update` will attempt to swap `update_lock` from `false` to
552    // `true`. If the CAS succeeds, the thread has obtained the update lock. If
553    // the CAS fails, then the `update` call returns immediately and the update
554    // is discarded.
555    update_lock: AtomicBool,
556
557    // Pointer to Arc<ReadinessQueueInner>
558    readiness_queue: AtomicPtr<()>,
559
560    // Tracks the number of `ReadyRef` pointers
561    ref_count: AtomicUsize,
562}
563
564/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
565/// atomic variable handles encoding / decoding `ReadinessState` values.
566struct AtomicState {
567    inner: AtomicUsize,
568}
569
570const MASK_2: usize = 4 - 1;
571const MASK_4: usize = 16 - 1;
572const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
573const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
574
575const READINESS_SHIFT: usize = 0;
576const INTEREST_SHIFT: usize = 4;
577const POLL_OPT_SHIFT: usize = 8;
578const TOKEN_RD_SHIFT: usize = 12;
579const TOKEN_WR_SHIFT: usize = 14;
580const QUEUED_SHIFT: usize = 16;
581const DROPPED_SHIFT: usize = 17;
582
583/// Tracks all state for a single `ReadinessNode`. The state is packed into a
584/// `usize` variable from low to high bit as follows:
585///
586/// 4 bits: Registration current readiness
587/// 4 bits: Registration interest
588/// 4 bits: Poll options
589/// 2 bits: Token position currently being read from by `poll`
590/// 2 bits: Token position last written to by `update`
591/// 1 bit:  Queued flag, set when node is being pushed into MPSC queue.
592/// 1 bit:  Dropped flag, set when all `Registration` handles have been dropped.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594struct ReadinessState(usize);
595
596/// Returned by `dequeue_node`. Represents the different states as described by
597/// the queue documentation on 1024cores.net.
598enum Dequeue {
599    Data(*mut ReadinessNode),
600    Empty,
601    Inconsistent,
602}
603
604const AWAKEN: Token = Token(usize::MAX);
605const MAX_REFCOUNT: usize = (isize::MAX) as usize;
606
607/*
608 *
609 * ===== Poll =====
610 *
611 */
612
613impl Poll {
614    /// Return a new `Poll` handle.
615    ///
616    /// This function will make a syscall to the operating system to create the
617    /// system selector. If this syscall fails, `Poll::new` will return with the
618    /// error.
619    ///
620    /// See [struct] level docs for more details.
621    ///
622    /// [struct]: struct.Poll.html
623    ///
624    /// # Examples
625    ///
626    /// ```
627    /// # use std::error::Error;
628    /// # fn try_main() -> Result<(), Box<Error>> {
629    /// use retty_io::{Poll, Events};
630    /// use std::time::Duration;
631    ///
632    /// let poll = match Poll::new() {
633    ///     Ok(poll) => poll,
634    ///     Err(e) => panic!("failed to create Poll instance; err={:?}", e),
635    /// };
636    ///
637    /// // Create a structure to receive polled events
638    /// let mut events = Events::with_capacity(1024);
639    ///
640    /// // Wait for events, but none will be received because no `Evented`
641    /// // handles have been registered with this `Poll` instance.
642    /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
643    /// assert_eq!(n, 0);
644    /// #     Ok(())
645    /// # }
646    /// #
647    /// # fn main() {
648    /// #     try_main().unwrap();
649    /// # }
650    /// ```
651    pub fn new() -> io::Result<Poll> {
652        is_send::<Poll>();
653        is_sync::<Poll>();
654
655        let poll = Poll {
656            selector: sys::Selector::new()?,
657            readiness_queue: ReadinessQueue::new()?,
658            lock_state: AtomicUsize::new(0),
659            lock: Mutex::new(()),
660            condvar: Condvar::new(),
661        };
662
663        // Register the notification wakeup FD with the IO poller
664        poll.readiness_queue.inner.awakener.register(
665            &poll,
666            AWAKEN,
667            Ready::readable(),
668            PollOpt::edge(),
669        )?;
670
671        Ok(poll)
672    }
673
674    /// Register an `Evented` handle with the `Poll` instance.
675    ///
676    /// Once registered, the `Poll` instance will monitor the `Evented` handle
677    /// for readiness state changes. When it notices a state change, it will
678    /// return a readiness event for the handle the next time [`poll`] is
679    /// called.
680    ///
681    /// See the [`struct`] docs for a high level overview.
682    ///
683    /// # Arguments
684    ///
685    /// `handle: &E: Evented`: This is the handle that the `Poll` instance
686    /// should monitor for readiness state changes.
687    ///
688    /// `token: Token`: The caller picks a token to associate with the socket.
689    /// When [`poll`] returns an event for the handle, this token is included.
690    /// This allows the caller to map the event to its handle. The token
691    /// associated with the `Evented` handle can be changed at any time by
692    /// calling [`reregister`].
693    ///
694    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
695    /// usage.
696    ///
697    /// See documentation on [`Token`] for an example showing how to pick
698    /// [`Token`] values.
699    ///
700    /// `interest: Ready`: Specifies which operations `Poll` should monitor for
701    /// readiness. `Poll` will only return readiness events for operations
702    /// specified by this argument.
703    ///
704    /// If a socket is registered with readable interest and the socket becomes
705    /// writable, no event will be returned from [`poll`].
706    ///
707    /// The readiness interest for an `Evented` handle can be changed at any
708    /// time by calling [`reregister`].
709    ///
710    /// `opts: PollOpt`: Specifies the registration options. The most common
711    /// options being [`level`] for level-triggered events, [`edge`] for
712    /// edge-triggered events, and [`oneshot`].
713    ///
714    /// The registration options for an `Evented` handle can be changed at any
715    /// time by calling [`reregister`].
716    ///
717    /// # Notes
718    ///
719    /// Unless otherwise specified, the caller should assume that once an
720    /// `Evented` handle is registered with a `Poll` instance, it is bound to
721    /// that `Poll` instance for the lifetime of the `Evented` handle. This
722    /// remains true even if the `Evented` handle is deregistered from the poll
723    /// instance using [`deregister`].
724    ///
725    /// This function is **thread safe**. It can be called concurrently from
726    /// multiple threads.
727    ///
728    /// [`struct`]: #
729    /// [`reregister`]: #method.reregister
730    /// [`deregister`]: #method.deregister
731    /// [`poll`]: #method.poll
732    /// [`level`]: struct.PollOpt.html#method.level
733    /// [`edge`]: struct.PollOpt.html#method.edge
734    /// [`oneshot`]: struct.PollOpt.html#method.oneshot
735    /// [`Token`]: struct.Token.html
736    ///
737    /// # Examples
738    ///
739    /// ```
740    /// # use std::error::Error;
741    /// # fn try_main() -> Result<(), Box<Error>> {
742    /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
743    /// use retty_io::net::TcpStream;
744    /// use std::time::{Duration, Instant};
745    ///
746    /// let poll = Poll::new()?;
747    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748    ///
749    /// // Register the socket with `poll`
750    /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751    ///
752    /// let mut events = Events::with_capacity(1024);
753    /// let start = Instant::now();
754    /// let timeout = Duration::from_millis(500);
755    ///
756    /// loop {
757    ///     let elapsed = start.elapsed();
758    ///
759    ///     if elapsed >= timeout {
760    ///         // Connection timed out
761    ///         return Ok(());
762    ///     }
763    ///
764    ///     let remaining = timeout - elapsed;
765    ///     poll.poll(&mut events, Some(remaining))?;
766    ///
767    ///     for event in &events {
768    ///         if event.token() == Token(0) {
769    ///             // Something (probably) happened on the socket.
770    ///             return Ok(());
771    ///         }
772    ///     }
773    /// }
774    /// #     Ok(())
775    /// # }
776    /// #
777    /// # fn main() {
778    /// #     try_main().unwrap();
779    /// # }
780    /// ```
781    pub fn register<E: ?Sized>(
782        &self,
783        handle: &E,
784        token: Token,
785        interest: Ready,
786        opts: PollOpt,
787    ) -> io::Result<()>
788    where
789        E: Evented,
790    {
791        validate_args(token)?;
792
793        /*
794         * Undefined behavior:
795         * - Reusing a token with a different `Evented` without deregistering
796         * (or closing) the original `Evented`.
797         */
798        trace!("registering with poller");
799
800        // Register interests for this socket
801        handle.register(self, token, interest, opts)?;
802
803        Ok(())
804    }
805
806    /// Re-register an `Evented` handle with the `Poll` instance.
807    ///
808    /// Re-registering an `Evented` handle allows changing the details of the
809    /// registration. Specifically, it allows updating the associated `token`,
810    /// `interest`, and `opts` specified in previous `register` and `reregister`
811    /// calls.
812    ///
813    /// The `reregister` arguments fully override the previous values. In other
814    /// words, if a socket is registered with [`readable`] interest and the call
815    /// to `reregister` specifies [`writable`], then read interest is no longer
816    /// requested for the handle.
817    ///
818    /// The `Evented` handle must have previously been registered with this
819    /// instance of `Poll` otherwise the call to `reregister` will return with
820    /// an error.
821    ///
822    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823    /// usage.
824    ///
825    /// See the [`register`] documentation for details about the function
826    /// arguments and see the [`struct`] docs for a high level overview of
827    /// polling.
828    ///
829    /// # Examples
830    ///
831    /// ```
832    /// # use std::error::Error;
833    /// # fn try_main() -> Result<(), Box<Error>> {
834    /// use retty_io::{Poll, Ready, PollOpt, Token};
835    /// use retty_io::net::TcpStream;
836    ///
837    /// let poll = Poll::new()?;
838    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
839    ///
840    /// // Register the socket with `poll`, requesting readable
841    /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
842    ///
843    /// // Reregister the socket specifying a different token and write interest
844    /// // instead. `PollOpt::edge()` must be specified even though that value
845    /// // is not being changed.
846    /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
847    /// #     Ok(())
848    /// # }
849    /// #
850    /// # fn main() {
851    /// #     try_main().unwrap();
852    /// # }
853    /// ```
854    ///
855    /// [`struct`]: #
856    /// [`register`]: #method.register
857    /// [`readable`]: struct.Ready.html#method.readable
858    /// [`writable`]: struct.Ready.html#method.writable
859    pub fn reregister<E: ?Sized>(
860        &self,
861        handle: &E,
862        token: Token,
863        interest: Ready,
864        opts: PollOpt,
865    ) -> io::Result<()>
866    where
867        E: Evented,
868    {
869        validate_args(token)?;
870
871        trace!("registering with poller");
872
873        // Register interests for this socket
874        handle.reregister(self, token, interest, opts)?;
875
876        Ok(())
877    }
878
879    /// Deregister an `Evented` handle with the `Poll` instance.
880    ///
881    /// When an `Evented` handle is deregistered, the `Poll` instance will
882    /// no longer monitor it for readiness state changes. Unlike disabling
883    /// handles with oneshot, deregistering clears up any internal resources
884    /// needed to track the handle.
885    ///
886    /// A handle can be passed back to `register` after it has been
887    /// deregistered; however, it must be passed back to the **same** `Poll`
888    /// instance.
889    ///
890    /// `Evented` handles are automatically deregistered when they are dropped.
891    /// It is common to never need to explicitly call `deregister`.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// # use std::error::Error;
897    /// # fn try_main() -> Result<(), Box<Error>> {
898    /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
899    /// use retty_io::net::TcpStream;
900    /// use std::time::Duration;
901    ///
902    /// let poll = Poll::new()?;
903    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
904    ///
905    /// // Register the socket with `poll`
906    /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
907    ///
908    /// poll.deregister(&socket)?;
909    ///
910    /// let mut events = Events::with_capacity(1024);
911    ///
912    /// // Set a timeout because this poll should never receive any events.
913    /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
914    /// assert_eq!(0, n);
915    /// #     Ok(())
916    /// # }
917    /// #
918    /// # fn main() {
919    /// #     try_main().unwrap();
920    /// # }
921    /// ```
922    pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
923    where
924        E: Evented,
925    {
926        trace!("deregistering handle with poller");
927
928        // Deregister interests for this socket
929        handle.deregister(self)?;
930
931        Ok(())
932    }
933
934    /// Wait for readiness events
935    ///
936    /// Blocks the current thread and waits for readiness events for any of the
937    /// `Evented` handles that have been registered with this `Poll` instance.
938    /// The function will block until either at least one readiness event has
939    /// been received or `timeout` has elapsed. A `timeout` of `None` means that
940    /// `poll` will block until a readiness event has been received.
941    ///
942    /// The supplied `events` will be cleared and newly received readiness events
943    /// will be pushed onto the end. At most `events.capacity()` events will be
944    /// returned. If there are further pending readiness events, they will be
945    /// returned on the next call to `poll`.
946    ///
947    /// A single call to `poll` may result in multiple readiness events being
948    /// returned for a single `Evented` handle. For example, if a TCP socket
949    /// becomes both readable and writable, it may be possible for a single
950    /// readiness event to be returned with both [`readable`] and [`writable`]
951    /// readiness **OR** two separate events may be returned, one with
952    /// [`readable`] set and one with [`writable`] set.
953    ///
954    /// Note that the `timeout` will be rounded up to the system clock
955    /// granularity (usually 1ms), and kernel scheduling delays mean that
956    /// the blocking interval may be overrun by a small amount.
957    ///
958    /// `poll` returns the number of readiness events that have been pushed into
959    /// `events` or `Err` when an error has been encountered with the system
960    /// selector.  The value returned is deprecated and will be removed in 0.7.0.
961    /// Accessing the events by index is also deprecated.  Events can be
962    /// inserted by other events triggering, thus making sequential access
963    /// problematic.  Use the iterator API instead.  See [`iter`].
964    ///
965    /// See the [struct] level documentation for a higher level discussion of
966    /// polling.
967    ///
968    /// [`readable`]: struct.Ready.html#method.readable
969    /// [`writable`]: struct.Ready.html#method.writable
970    /// [struct]: #
971    /// [`iter`]: struct.Events.html#method.iter
972    ///
973    /// # Examples
974    ///
975    /// A basic example -- establishing a `TcpStream` connection.
976    ///
977    /// ```
978    /// # use std::error::Error;
979    /// # fn try_main() -> Result<(), Box<Error>> {
980    /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
981    /// use retty_io::net::TcpStream;
982    ///
983    /// use std::net::{TcpListener, SocketAddr};
984    /// use std::thread;
985    ///
986    /// // Bind a server socket to connect to.
987    /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
988    /// let server = TcpListener::bind(&addr)?;
989    /// let addr = server.local_addr()?.clone();
990    ///
991    /// // Spawn a thread to accept the socket
992    /// thread::spawn(move || {
993    ///     let _ = server.accept();
994    /// });
995    ///
996    /// // Construct a new `Poll` handle as well as the `Events` we'll store into
997    /// let poll = Poll::new()?;
998    /// let mut events = Events::with_capacity(1024);
999    ///
1000    /// // Connect the stream
1001    /// let stream = TcpStream::connect(&addr)?;
1002    ///
1003    /// // Register the stream with `Poll`
1004    /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1005    ///
1006    /// // Wait for the socket to become ready. This has to happens in a loop to
1007    /// // handle spurious wakeups.
1008    /// loop {
1009    ///     poll.poll(&mut events, None)?;
1010    ///
1011    ///     for event in &events {
1012    ///         if event.token() == Token(0) && event.readiness().is_writable() {
1013    ///             // The socket connected (probably, it could still be a spurious
1014    ///             // wakeup)
1015    ///             return Ok(());
1016    ///         }
1017    ///     }
1018    /// }
1019    /// #     Ok(())
1020    /// # }
1021    /// #
1022    /// # fn main() {
1023    /// #     try_main().unwrap();
1024    /// # }
1025    /// ```
1026    ///
1027    /// [struct]: #
1028    pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1029        self.poll1(events, timeout, false)
1030    }
1031
1032    /// Like `poll`, but may be interrupted by a signal
1033    ///
1034    /// If `poll` is inturrupted while blocking, it will transparently retry the syscall.  If you
1035    /// want to handle signals yourself, however, use `poll_interruptible`.
1036    pub fn poll_interruptible(
1037        &self,
1038        events: &mut Events,
1039        timeout: Option<Duration>,
1040    ) -> io::Result<usize> {
1041        self.poll1(events, timeout, true)
1042    }
1043
1044    fn poll1(
1045        &self,
1046        events: &mut Events,
1047        mut timeout: Option<Duration>,
1048        interruptible: bool,
1049    ) -> io::Result<usize> {
1050        let zero = Some(Duration::from_millis(0));
1051
1052        // At a high level, the synchronization strategy is to acquire access to
1053        // the critical section by transitioning the atomic from unlocked ->
1054        // locked. If the attempt fails, the thread will wait on the condition
1055        // variable.
1056        //
1057        // # Some more detail
1058        //
1059        // The `lock_state` atomic usize combines:
1060        //
1061        // - locked flag, stored in the least significant bit
1062        // - number of waiting threads, stored in the rest of the bits.
1063        //
1064        // When a thread transitions the locked flag from 0 -> 1, it has
1065        // obtained access to the critical section.
1066        //
1067        // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1068        // This is a fast path for the case when there are no concurrent calls
1069        // to poll, which is very common.
1070        //
1071        // On failure, the mutex is locked, and the thread attempts to increment
1072        // the number of waiting threads component of `lock_state`. If this is
1073        // successfully done while the locked flag is set, then the thread can
1074        // wait on the condition variable.
1075        //
1076        // When a thread exits the critical section, it unsets the locked flag.
1077        // If there are any waiters, which is atomically determined while
1078        // unsetting the locked flag, then the condvar is notified.
1079
1080        let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1081
1082        if 0 != curr {
1083            // Enter slower path
1084            let mut lock = self.lock.lock().unwrap();
1085            let mut inc = false;
1086
1087            loop {
1088                if curr & 1 == 0 {
1089                    // The lock is currently free, attempt to grab it
1090                    let mut next = curr | 1;
1091
1092                    if inc {
1093                        // The waiter count has previously been incremented, so
1094                        // decrement it here
1095                        next -= 2;
1096                    }
1097
1098                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1099
1100                    if actual != curr {
1101                        curr = actual;
1102                        continue;
1103                    }
1104
1105                    // Lock acquired, break from the loop
1106                    break;
1107                }
1108
1109                if timeout == zero {
1110                    if inc {
1111                        self.lock_state.fetch_sub(2, SeqCst);
1112                    }
1113
1114                    return Ok(0);
1115                }
1116
1117                // The lock is currently held, so wait for it to become
1118                // free. If the waiter count hasn't been incremented yet, do
1119                // so now
1120                if !inc {
1121                    let next = curr.checked_add(2).expect("overflow");
1122                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1123
1124                    if actual != curr {
1125                        curr = actual;
1126                        continue;
1127                    }
1128
1129                    // Track that the waiter count has been incremented for
1130                    // this thread and fall through to the condvar waiting
1131                    inc = true;
1132                }
1133
1134                lock = match timeout {
1135                    Some(to) => {
1136                        let now = Instant::now();
1137
1138                        // Wait to be notified
1139                        let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1140
1141                        // See how much time was elapsed in the wait
1142                        let elapsed = now.elapsed();
1143
1144                        // Update `timeout` to reflect how much time is left to
1145                        // wait.
1146                        if elapsed >= to {
1147                            timeout = zero;
1148                        } else {
1149                            // Update the timeout
1150                            timeout = Some(to - elapsed);
1151                        }
1152
1153                        l
1154                    }
1155                    None => self.condvar.wait(lock).unwrap(),
1156                };
1157
1158                // Reload the state
1159                curr = self.lock_state.load(SeqCst);
1160
1161                // Try to lock again...
1162            }
1163        }
1164
1165        let ret = self.poll2(events, timeout, interruptible);
1166
1167        // Release the lock
1168        if 1 != self.lock_state.fetch_and(!1, Release) {
1169            // Acquire the mutex
1170            let _lock = self.lock.lock().unwrap();
1171
1172            // There is at least one waiting thread, so notify one
1173            self.condvar.notify_one();
1174        }
1175
1176        ret
1177    }
1178
1179    #[inline]
1180    #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
1181    fn poll2(
1182        &self,
1183        events: &mut Events,
1184        mut timeout: Option<Duration>,
1185        interruptible: bool,
1186    ) -> io::Result<usize> {
1187        // Compute the timeout value passed to the system selector. If the
1188        // readiness queue has pending nodes, we still want to poll the system
1189        // selector for new events, but we don't want to block the thread to
1190        // wait for new events.
1191        if timeout == Some(Duration::from_millis(0)) {
1192            // If blocking is not requested, then there is no need to prepare
1193            // the queue for sleep
1194            //
1195            // The sleep_marker should be removed by readiness_queue.poll().
1196        } else if self.readiness_queue.prepare_for_sleep() {
1197            // The readiness queue is empty. The call to `prepare_for_sleep`
1198            // inserts `sleep_marker` into the queue. This signals to any
1199            // threads setting readiness that the `Poll::poll` is going to
1200            // sleep, so the awakener should be used.
1201        } else {
1202            // The readiness queue is not empty, so do not block the thread.
1203            timeout = Some(Duration::from_millis(0));
1204        }
1205
1206        loop {
1207            let now = Instant::now();
1208            // First get selector events
1209            let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1210            match res {
1211                Ok(true) => {
1212                    // Some awakeners require reading from a FD.
1213                    self.readiness_queue.inner.awakener.cleanup();
1214                    break;
1215                }
1216                Ok(false) => break,
1217                Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
1218                    // Interrupted by a signal; update timeout if necessary and retry
1219                    if let Some(to) = timeout {
1220                        let elapsed = now.elapsed();
1221                        if elapsed >= to {
1222                            break;
1223                        } else {
1224                            timeout = Some(to - elapsed);
1225                        }
1226                    }
1227                }
1228                Err(e) => return Err(e),
1229            }
1230        }
1231
1232        // Poll custom event queue
1233        self.readiness_queue.poll(&mut events.inner);
1234
1235        // Return number of polled events
1236        Ok(events.inner.len())
1237    }
1238}
1239
1240fn validate_args(token: Token) -> io::Result<()> {
1241    if token == AWAKEN {
1242        return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1243    }
1244
1245    Ok(())
1246}
1247
1248impl fmt::Debug for Poll {
1249    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1250        fmt.debug_struct("Poll").finish()
1251    }
1252}
1253
1254#[cfg(all(unix, not(target_os = "fuchsia")))]
1255impl AsRawFd for Poll {
1256    fn as_raw_fd(&self) -> RawFd {
1257        self.selector.as_raw_fd()
1258    }
1259}
1260
1261/// A collection of readiness events.
1262///
1263/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1264/// receive any new readiness events received since the last poll. Usually, a
1265/// single `Events` instance is created at the same time as a [`Poll`] and
1266/// reused on each call to [`Poll::poll`].
1267///
1268/// See [`Poll`] for more documentation on polling.
1269///
1270/// # Examples
1271///
1272/// ```
1273/// # use std::error::Error;
1274/// # fn try_main() -> Result<(), Box<Error>> {
1275/// use retty_io::{Events, Poll};
1276/// use std::time::Duration;
1277///
1278/// let mut events = Events::with_capacity(1024);
1279/// let poll = Poll::new()?;
1280///
1281/// assert_eq!(0, events.len());
1282///
1283/// // Register `Evented` handles with `poll`
1284///
1285/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1286///
1287/// for event in &events {
1288///     println!("event={:?}", event);
1289/// }
1290/// #     Ok(())
1291/// # }
1292/// #
1293/// # fn main() {
1294/// #     try_main().unwrap();
1295/// # }
1296/// ```
1297///
1298/// [`Poll::poll`]: struct.Poll.html#method.poll
1299/// [`Poll`]: struct.Poll.html
1300pub struct Events {
1301    inner: sys::Events,
1302}
1303
1304/// [`Events`] iterator.
1305///
1306/// This struct is created by the [`iter`] method on [`Events`].
1307///
1308/// # Examples
1309///
1310/// ```
1311/// # use std::error::Error;
1312/// # fn try_main() -> Result<(), Box<Error>> {
1313/// use retty_io::{Events, Poll};
1314/// use std::time::Duration;
1315///
1316/// let mut events = Events::with_capacity(1024);
1317/// let poll = Poll::new()?;
1318///
1319/// // Register handles with `poll`
1320///
1321/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1322///
1323/// for event in events.iter() {
1324///     println!("event={:?}", event);
1325/// }
1326/// #     Ok(())
1327/// # }
1328/// #
1329/// # fn main() {
1330/// #     try_main().unwrap();
1331/// # }
1332/// ```
1333///
1334/// [`Events`]: struct.Events.html
1335/// [`iter`]: struct.Events.html#method.iter
1336#[derive(Debug, Clone)]
1337pub struct Iter<'a> {
1338    inner: &'a Events,
1339    pos: usize,
1340}
1341
1342/// Owned [`Events`] iterator.
1343///
1344/// This struct is created by the `into_iter` method on [`Events`].
1345///
1346/// # Examples
1347///
1348/// ```
1349/// # use std::error::Error;
1350/// # fn try_main() -> Result<(), Box<Error>> {
1351/// use retty_io::{Events, Poll};
1352/// use std::time::Duration;
1353///
1354/// let mut events = Events::with_capacity(1024);
1355/// let poll = Poll::new()?;
1356///
1357/// // Register handles with `poll`
1358///
1359/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1360///
1361/// for event in events {
1362///     println!("event={:?}", event);
1363/// }
1364/// #     Ok(())
1365/// # }
1366/// #
1367/// # fn main() {
1368/// #     try_main().unwrap();
1369/// # }
1370/// ```
1371/// [`Events`]: struct.Events.html
1372#[derive(Debug)]
1373pub struct IntoIter {
1374    inner: Events,
1375    pos: usize,
1376}
1377
1378impl Events {
1379    /// Return a new `Events` capable of holding up to `capacity` events.
1380    ///
1381    /// # Examples
1382    ///
1383    /// ```
1384    /// use retty_io::Events;
1385    ///
1386    /// let events = Events::with_capacity(1024);
1387    ///
1388    /// assert_eq!(1024, events.capacity());
1389    /// ```
1390    pub fn with_capacity(capacity: usize) -> Events {
1391        Events {
1392            inner: sys::Events::with_capacity(capacity),
1393        }
1394    }
1395
1396    #[deprecated(
1397        since = "0.6.10",
1398        note = "Index access removed in favor of iterator only API."
1399    )]
1400    #[doc(hidden)]
1401    pub fn get(&self, idx: usize) -> Option<Event> {
1402        self.inner.get(idx)
1403    }
1404
1405    #[doc(hidden)]
1406    #[deprecated(
1407        since = "0.6.10",
1408        note = "Index access removed in favor of iterator only API."
1409    )]
1410    pub fn len(&self) -> usize {
1411        self.inner.len()
1412    }
1413
1414    /// Returns the number of `Event` values that `self` can hold.
1415    ///
1416    /// ```
1417    /// use retty_io::Events;
1418    ///
1419    /// let events = Events::with_capacity(1024);
1420    ///
1421    /// assert_eq!(1024, events.capacity());
1422    /// ```
1423    pub fn capacity(&self) -> usize {
1424        self.inner.capacity()
1425    }
1426
1427    /// Returns `true` if `self` contains no `Event` values.
1428    ///
1429    /// # Examples
1430    ///
1431    /// ```
1432    /// use retty_io::Events;
1433    ///
1434    /// let events = Events::with_capacity(1024);
1435    ///
1436    /// assert!(events.is_empty());
1437    /// ```
1438    pub fn is_empty(&self) -> bool {
1439        self.inner.is_empty()
1440    }
1441
1442    /// Returns an iterator over the `Event` values.
1443    ///
1444    /// # Examples
1445    ///
1446    /// ```
1447    /// # use std::error::Error;
1448    /// # fn try_main() -> Result<(), Box<Error>> {
1449    /// use retty_io::{Events, Poll};
1450    /// use std::time::Duration;
1451    ///
1452    /// let mut events = Events::with_capacity(1024);
1453    /// let poll = Poll::new()?;
1454    ///
1455    /// // Register handles with `poll`
1456    ///
1457    /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1458    ///
1459    /// for event in events.iter() {
1460    ///     println!("event={:?}", event);
1461    /// }
1462    /// #     Ok(())
1463    /// # }
1464    /// #
1465    /// # fn main() {
1466    /// #     try_main().unwrap();
1467    /// # }
1468    /// ```
1469    pub fn iter(&self) -> Iter {
1470        Iter {
1471            inner: self,
1472            pos: 0,
1473        }
1474    }
1475
1476    /// Clearing all `Event` values from container explicitly.
1477    ///
1478    /// # Examples
1479    ///
1480    /// ```
1481    /// # use std::error::Error;
1482    /// # fn try_main() -> Result<(), Box<Error>> {
1483    /// use retty_io::{Events, Poll};
1484    /// use std::time::Duration;
1485    ///
1486    /// let mut events = Events::with_capacity(1024);
1487    /// let poll = Poll::new()?;
1488    ///
1489    /// // Register handles with `poll`
1490    /// for _ in 0..2 {
1491    ///     events.clear();
1492    ///     poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1493    ///
1494    ///     for event in events.iter() {
1495    ///         println!("event={:?}", event);
1496    ///     }
1497    /// }
1498    /// #     Ok(())
1499    /// # }
1500    /// #
1501    /// # fn main() {
1502    /// #     try_main().unwrap();
1503    /// # }
1504    /// ```
1505    pub fn clear(&mut self) {
1506        self.inner.clear();
1507    }
1508}
1509
1510impl<'a> IntoIterator for &'a Events {
1511    type Item = Event;
1512    type IntoIter = Iter<'a>;
1513
1514    fn into_iter(self) -> Self::IntoIter {
1515        self.iter()
1516    }
1517}
1518
1519impl<'a> Iterator for Iter<'a> {
1520    type Item = Event;
1521
1522    fn next(&mut self) -> Option<Event> {
1523        let ret = self.inner.inner.get(self.pos);
1524        self.pos += 1;
1525        ret
1526    }
1527}
1528
1529impl IntoIterator for Events {
1530    type Item = Event;
1531    type IntoIter = IntoIter;
1532
1533    fn into_iter(self) -> Self::IntoIter {
1534        IntoIter {
1535            inner: self,
1536            pos: 0,
1537        }
1538    }
1539}
1540
1541impl Iterator for IntoIter {
1542    type Item = Event;
1543
1544    fn next(&mut self) -> Option<Event> {
1545        let ret = self.inner.inner.get(self.pos);
1546        self.pos += 1;
1547        ret
1548    }
1549}
1550
1551impl fmt::Debug for Events {
1552    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1553        f.debug_struct("Events")
1554            .field("capacity", &self.capacity())
1555            .finish()
1556    }
1557}
1558
1559// ===== Accessors for internal usage =====
1560
1561pub fn selector(poll: &Poll) -> &sys::Selector {
1562    &poll.selector
1563}
1564
1565/*
1566 *
1567 * ===== Registration =====
1568 *
1569 */
1570
1571// TODO: get rid of this, windows depends on it for now
1572#[allow(dead_code)]
1573pub fn new_registration(
1574    poll: &Poll,
1575    token: Token,
1576    ready: Ready,
1577    opt: PollOpt,
1578) -> (Registration, SetReadiness) {
1579    Registration::new_priv(poll, token, ready, opt)
1580}
1581
1582impl Registration {
1583    /// Create and return a new `Registration` and the associated
1584    /// `SetReadiness`.
1585    ///
1586    /// See [struct] documentation for more detail and [`Poll`]
1587    /// for high level documentation on polling.
1588    ///
1589    /// # Examples
1590    ///
1591    /// ```
1592    /// # use std::error::Error;
1593    /// # fn try_main() -> Result<(), Box<Error>> {
1594    /// use retty_io::{Events, Ready, Registration, Poll, PollOpt, Token};
1595    /// use std::thread;
1596    ///
1597    /// let (registration, set_readiness) = Registration::new2();
1598    ///
1599    /// thread::spawn(move || {
1600    ///     use std::time::Duration;
1601    ///     thread::sleep(Duration::from_millis(500));
1602    ///
1603    ///     set_readiness.set_readiness(Ready::readable());
1604    /// });
1605    ///
1606    /// let poll = Poll::new()?;
1607    /// poll.register(&registration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1608    ///
1609    /// let mut events = Events::with_capacity(256);
1610    ///
1611    /// loop {
1612    ///     poll.poll(&mut events, None);
1613    ///
1614    ///     for event in &events {
1615    ///         if event.token() == Token(0) && event.readiness().is_readable() {
1616    ///             return Ok(());
1617    ///         }
1618    ///     }
1619    /// }
1620    /// #     Ok(())
1621    /// # }
1622    /// #
1623    /// # fn main() {
1624    /// #     try_main().unwrap();
1625    /// # }
1626    /// ```
1627    /// [struct]: #
1628    /// [`Poll`]: struct.Poll.html
1629    pub fn new2() -> (Registration, SetReadiness) {
1630        // Allocate the registration node. The new node will have `ref_count`
1631        // set to 2: one SetReadiness, one Registration.
1632        let node = Box::into_raw(Box::new(ReadinessNode::new(
1633            ptr::null_mut(),
1634            Token(0),
1635            Ready::empty(),
1636            PollOpt::empty(),
1637            2,
1638        )));
1639
1640        let registration = Registration {
1641            inner: RegistrationInner { node },
1642        };
1643
1644        let set_readiness = SetReadiness {
1645            inner: RegistrationInner { node },
1646        };
1647
1648        (registration, set_readiness)
1649    }
1650
1651    #[deprecated(since = "0.6.5", note = "use `new2` instead")]
1652    #[cfg(feature = "with-deprecated")]
1653    #[doc(hidden)]
1654    pub fn new(
1655        poll: &Poll,
1656        token: Token,
1657        interest: Ready,
1658        opt: PollOpt,
1659    ) -> (Registration, SetReadiness) {
1660        Registration::new_priv(poll, token, interest, opt)
1661    }
1662
1663    // TODO: Get rid of this (windows depends on it for now)
1664    fn new_priv(
1665        poll: &Poll,
1666        token: Token,
1667        interest: Ready,
1668        opt: PollOpt,
1669    ) -> (Registration, SetReadiness) {
1670        is_send::<Registration>();
1671        is_sync::<Registration>();
1672        is_send::<SetReadiness>();
1673        is_sync::<SetReadiness>();
1674
1675        // Clone handle to the readiness queue, this bumps the ref count
1676        let queue = poll.readiness_queue.inner.clone();
1677
1678        // Convert to a *mut () pointer
1679        let queue: *mut () = unsafe { mem::transmute(queue) };
1680
1681        // Allocate the registration node. The new node will have `ref_count`
1682        // set to 3: one SetReadiness, one Registration, and one Poll handle.
1683        let node = Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1684
1685        let registration = Registration {
1686            inner: RegistrationInner { node },
1687        };
1688
1689        let set_readiness = SetReadiness {
1690            inner: RegistrationInner { node },
1691        };
1692
1693        (registration, set_readiness)
1694    }
1695
1696    #[deprecated(since = "0.6.5", note = "use `Evented` impl")]
1697    #[cfg(feature = "with-deprecated")]
1698    #[doc(hidden)]
1699    pub fn update(
1700        &self,
1701        poll: &Poll,
1702        token: Token,
1703        interest: Ready,
1704        opts: PollOpt,
1705    ) -> io::Result<()> {
1706        self.inner.update(poll, token, interest, opts)
1707    }
1708
1709    #[deprecated(since = "0.6.5", note = "use `Poll::deregister` instead")]
1710    #[cfg(feature = "with-deprecated")]
1711    #[doc(hidden)]
1712    pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1713        self.inner
1714            .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1715    }
1716}
1717
1718impl Evented for Registration {
1719    fn register(
1720        &self,
1721        poll: &Poll,
1722        token: Token,
1723        interest: Ready,
1724        opts: PollOpt,
1725    ) -> io::Result<()> {
1726        self.inner.update(poll, token, interest, opts)
1727    }
1728
1729    fn reregister(
1730        &self,
1731        poll: &Poll,
1732        token: Token,
1733        interest: Ready,
1734        opts: PollOpt,
1735    ) -> io::Result<()> {
1736        self.inner.update(poll, token, interest, opts)
1737    }
1738
1739    fn deregister(&self, poll: &Poll) -> io::Result<()> {
1740        self.inner
1741            .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1742    }
1743}
1744
1745impl Drop for Registration {
1746    fn drop(&mut self) {
1747        // `flag_as_dropped` toggles the `dropped` flag and notifies
1748        // `Poll::poll` to release its handle (which is just decrementing
1749        // the ref count).
1750        if self.inner.state.flag_as_dropped() {
1751            // Can't do anything if the queuing fails
1752            let _ = self.inner.enqueue_with_wakeup();
1753        }
1754    }
1755}
1756
1757impl fmt::Debug for Registration {
1758    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1759        fmt.debug_struct("Registration").finish()
1760    }
1761}
1762
1763impl SetReadiness {
1764    /// Returns the registration's current readiness.
1765    ///
1766    /// # Note
1767    ///
1768    /// There is no guarantee that `readiness` establishes any sort of memory
1769    /// ordering. Any concurrent data access must be synchronized using another
1770    /// strategy.
1771    ///
1772    /// # Examples
1773    ///
1774    /// ```
1775    /// # use std::error::Error;
1776    /// # fn try_main() -> Result<(), Box<Error>> {
1777    /// use retty_io::{Registration, Ready};
1778    ///
1779    /// let (registration, set_readiness) = Registration::new2();
1780    ///
1781    /// assert!(set_readiness.readiness().is_empty());
1782    ///
1783    /// set_readiness.set_readiness(Ready::readable())?;
1784    /// assert!(set_readiness.readiness().is_readable());
1785    /// #     Ok(())
1786    /// # }
1787    /// #
1788    /// # fn main() {
1789    /// #     try_main().unwrap();
1790    /// # }
1791    /// ```
1792    pub fn readiness(&self) -> Ready {
1793        self.inner.readiness()
1794    }
1795
1796    /// Set the registration's readiness
1797    ///
1798    /// If the associated `Registration` is registered with a [`Poll`] instance
1799    /// and has requested readiness events that include `ready`, then a future
1800    /// call to [`Poll::poll`] will receive a readiness event representing the
1801    /// readiness state change.
1802    ///
1803    /// # Note
1804    ///
1805    /// There is no guarantee that `readiness` establishes any sort of memory
1806    /// ordering. Any concurrent data access must be synchronized using another
1807    /// strategy.
1808    ///
1809    /// There is also no guarantee as to when the readiness event will be
1810    /// delivered to poll. A best attempt will be made to make the delivery in a
1811    /// "timely" fashion. For example, the following is **not** guaranteed to
1812    /// work:
1813    ///
1814    /// ```
1815    /// # use std::error::Error;
1816    /// # fn try_main() -> Result<(), Box<Error>> {
1817    /// use retty_io::{Events, Registration, Ready, Poll, PollOpt, Token};
1818    ///
1819    /// let poll = Poll::new()?;
1820    /// let (registration, set_readiness) = Registration::new2();
1821    ///
1822    /// poll.register(&registration,
1823    ///               Token(0),
1824    ///               Ready::readable(),
1825    ///               PollOpt::edge())?;
1826    ///
1827    /// // Set the readiness, then immediately poll to try to get the readiness
1828    /// // event
1829    /// set_readiness.set_readiness(Ready::readable())?;
1830    ///
1831    /// let mut events = Events::with_capacity(1024);
1832    /// poll.poll(&mut events, None)?;
1833    ///
1834    /// // There is NO guarantee that the following will work. It is possible
1835    /// // that the readiness event will be delivered at a later time.
1836    /// let event = events.get(0).unwrap();
1837    /// assert_eq!(event.token(), Token(0));
1838    /// assert!(event.readiness().is_readable());
1839    /// #     Ok(())
1840    /// # }
1841    /// #
1842    /// # fn main() {
1843    /// #     try_main().unwrap();
1844    /// # }
1845    /// ```
1846    ///
1847    /// # Examples
1848    ///
1849    /// A simple example, for a more elaborate example, see the [`Evented`]
1850    /// documentation.
1851    ///
1852    /// ```
1853    /// # use std::error::Error;
1854    /// # fn try_main() -> Result<(), Box<Error>> {
1855    /// use retty_io::{Registration, Ready};
1856    ///
1857    /// let (registration, set_readiness) = Registration::new2();
1858    ///
1859    /// assert!(set_readiness.readiness().is_empty());
1860    ///
1861    /// set_readiness.set_readiness(Ready::readable())?;
1862    /// assert!(set_readiness.readiness().is_readable());
1863    /// #     Ok(())
1864    /// # }
1865    /// #
1866    /// # fn main() {
1867    /// #     try_main().unwrap();
1868    /// # }
1869    /// ```
1870    ///
1871    /// [`Registration`]: struct.Registration.html
1872    /// [`Evented`]: event/trait.Evented.html#examples
1873    /// [`Poll`]: struct.Poll.html
1874    /// [`Poll::poll`]: struct.Poll.html#method.poll
1875    pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1876        self.inner.set_readiness(ready)
1877    }
1878}
1879
1880impl fmt::Debug for SetReadiness {
1881    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1882        f.debug_struct("SetReadiness").finish()
1883    }
1884}
1885
1886impl RegistrationInner {
1887    /// Get the registration's readiness.
1888    fn readiness(&self) -> Ready {
1889        self.state.load(Relaxed).readiness()
1890    }
1891
1892    /// Set the registration's readiness.
1893    ///
1894    /// This function can be called concurrently by an arbitrary number of
1895    /// SetReadiness handles.
1896    fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1897        // Load the current atomic state.
1898        let mut state = self.state.load(Acquire);
1899        let mut next;
1900
1901        loop {
1902            next = state;
1903
1904            if state.is_dropped() {
1905                // Node is dropped, no more notifications
1906                return Ok(());
1907            }
1908
1909            // Update the readiness
1910            next.set_readiness(ready);
1911
1912            // If the readiness is not blank, try to obtain permission to
1913            // push the node into the readiness queue.
1914            if !next.effective_readiness().is_empty() {
1915                next.set_queued();
1916            }
1917
1918            let actual = self.state.compare_and_swap(state, next, AcqRel);
1919
1920            if state == actual {
1921                break;
1922            }
1923
1924            state = actual;
1925        }
1926
1927        if !state.is_queued() && next.is_queued() {
1928            // We toggled the queued flag, making us responsible for queuing the
1929            // node in the MPSC readiness queue.
1930            self.enqueue_with_wakeup()?;
1931        }
1932
1933        Ok(())
1934    }
1935
1936    /// Update the registration details associated with the node
1937    fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
1938        // First, ensure poll instances match
1939        //
1940        // Load the queue pointer, `Relaxed` is sufficient here as only the
1941        // pointer is being operated on. The actual memory is guaranteed to be
1942        // visible the `poll: &Poll` ref passed as an argument to the function.
1943        let mut queue = self.readiness_queue.load(Relaxed);
1944        let other: &*mut () =
1945            unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1946        let other = *other;
1947
1948        debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
1949
1950        if queue.is_null() {
1951            // Attempt to set the queue pointer. `Release` ordering synchronizes
1952            // with `Acquire` in `ensure_with_wakeup`.
1953            let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1954
1955            if actual.is_null() {
1956                // The CAS succeeded, this means that the node's ref count
1957                // should be incremented to reflect that the `poll` function
1958                // effectively owns the node as well.
1959                //
1960                // `Relaxed` ordering used for the same reason as in
1961                // RegistrationInner::clone
1962                self.ref_count.fetch_add(1, Relaxed);
1963
1964                // Note that the `queue` reference stored in our
1965                // `readiness_queue` field is intended to be a strong reference,
1966                // so now that we've successfully claimed the reference we bump
1967                // the refcount here.
1968                //
1969                // Down below in `release_node` when we deallocate this
1970                // `RegistrationInner` is where we'll transmute this back to an
1971                // arc and decrement the reference count.
1972                mem::forget(poll.readiness_queue.clone());
1973            } else {
1974                // The CAS failed, another thread set the queue pointer, so ensure
1975                // that the pointer and `other` match
1976                if actual != other {
1977                    return Err(io::Error::new(
1978                        io::ErrorKind::Other,
1979                        "registration handle associated with another `Poll` instance",
1980                    ));
1981                }
1982            }
1983
1984            queue = other;
1985        } else if queue != other {
1986            return Err(io::Error::new(
1987                io::ErrorKind::Other,
1988                "registration handle associated with another `Poll` instance",
1989            ));
1990        }
1991
1992        unsafe {
1993            let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1994            debug_assert_eq!(queue as usize, *actual);
1995        }
1996
1997        // The `update_lock` atomic is used as a flag ensuring only a single
1998        // thread concurrently enters the `update` critical section. Any
1999        // concurrent calls to update are discarded. If coordinated updates are
2000        // required, the retty-io user is responsible for handling that.
2001        //
2002        // Acquire / Release ordering is used on `update_lock` to ensure that
2003        // data access to the `token_*` variables are scoped to the critical
2004        // section.
2005
2006        // Acquire the update lock.
2007        if self.update_lock.compare_and_swap(false, true, Acquire) {
2008            // The lock is already held. Discard the update
2009            return Ok(());
2010        }
2011
2012        // Relaxed ordering is acceptable here as the only memory that needs to
2013        // be visible as part of the update are the `token_*` variables, and
2014        // ordering has already been handled by the `update_lock` access.
2015        let mut state = self.state.load(Relaxed);
2016        let mut next;
2017
2018        // Read the current token, again this memory has been ordered by the
2019        // acquire on `update_lock`.
2020        let curr_token_pos = state.token_write_pos();
2021        let curr_token = unsafe { self::token(self, curr_token_pos) };
2022
2023        let mut next_token_pos = curr_token_pos;
2024
2025        // If the `update` call is changing the token, then compute the next
2026        // available token slot and write the token there.
2027        //
2028        // Note that this computation is happening *outside* of the
2029        // compare-and-swap loop. The update lock ensures that only a single
2030        // thread could be mutating the write_token_position, so the
2031        // `next_token_pos` will never need to be recomputed even if
2032        // `token_read_pos` concurrently changes. This is because
2033        // `token_read_pos` can ONLY concurrently change to the current value of
2034        // `token_write_pos`, so `next_token_pos` will always remain valid.
2035        if token != curr_token {
2036            next_token_pos = state.next_token_pos();
2037
2038            // Update the token
2039            match next_token_pos {
2040                0 => unsafe { *self.token_0.get() = token },
2041                1 => unsafe { *self.token_1.get() = token },
2042                2 => unsafe { *self.token_2.get() = token },
2043                _ => unreachable!(),
2044            }
2045        }
2046
2047        // Now enter the compare-and-swap loop
2048        loop {
2049            next = state;
2050
2051            // The node is only dropped once all `Registration` handles are
2052            // dropped. Only `Registration` can call `update`.
2053            debug_assert!(!state.is_dropped());
2054
2055            // Update the write token position, this will also release the token
2056            // to Poll::poll.
2057            next.set_token_write_pos(next_token_pos);
2058
2059            // Update readiness and poll opts
2060            next.set_interest(interest);
2061            next.set_poll_opt(opt);
2062
2063            // If there is effective readiness, the node will need to be queued
2064            // for processing. This exact behavior is still TBD, so we are
2065            // conservative for now and always fire.
2066            //
2067            // See https://github.com/carllerche/retty-io/issues/535.
2068            if !next.effective_readiness().is_empty() {
2069                next.set_queued();
2070            }
2071
2072            // compare-and-swap the state values. Only `Release` is needed here.
2073            // The `Release` ensures that `Poll::poll` will see the token
2074            // update and the update function doesn't care about any other
2075            // memory visibility.
2076            let actual = self.state.compare_and_swap(state, next, Release);
2077
2078            if actual == state {
2079                break;
2080            }
2081
2082            // CAS failed, but `curr_token_pos` should not have changed given
2083            // that we still hold the update lock.
2084            debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2085
2086            state = actual;
2087        }
2088
2089        // Release the lock
2090        self.update_lock.store(false, Release);
2091
2092        if !state.is_queued() && next.is_queued() {
2093            // We are responsible for enqueing the node.
2094            enqueue_with_wakeup(queue, self)?;
2095        }
2096
2097        Ok(())
2098    }
2099}
2100
2101impl ops::Deref for RegistrationInner {
2102    type Target = ReadinessNode;
2103
2104    fn deref(&self) -> &ReadinessNode {
2105        unsafe { &*self.node }
2106    }
2107}
2108
2109impl Clone for RegistrationInner {
2110    fn clone(&self) -> RegistrationInner {
2111        // Using a relaxed ordering is alright here, as knowledge of the
2112        // original reference prevents other threads from erroneously deleting
2113        // the object.
2114        //
2115        // As explained in the [Boost documentation][1], Increasing the
2116        // reference counter can always be done with memory_order_relaxed: New
2117        // references to an object can only be formed from an existing
2118        // reference, and passing an existing reference from one thread to
2119        // another must already provide any required synchronization.
2120        //
2121        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2122        let old_size = self.ref_count.fetch_add(1, Relaxed);
2123
2124        // However we need to guard against massive refcounts in case someone
2125        // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2126        // and users will use-after free. We racily saturate to `isize::MAX` on
2127        // the assumption that there aren't ~2 billion threads incrementing
2128        // the reference count at once. This branch will never be taken in
2129        // any realistic program.
2130        //
2131        // We abort because such a program is incredibly degenerate, and we
2132        // don't care to support it.
2133        if old_size & !MAX_REFCOUNT != 0 {
2134            process::abort();
2135        }
2136
2137        RegistrationInner { node: self.node }
2138    }
2139}
2140
2141impl Drop for RegistrationInner {
2142    fn drop(&mut self) {
2143        // Only handles releasing from `Registration` and `SetReadiness`
2144        // handles. Poll has to call this itself.
2145        release_node(self.node);
2146    }
2147}
2148
2149/*
2150 *
2151 * ===== ReadinessQueue =====
2152 *
2153 */
2154
2155impl ReadinessQueue {
2156    /// Create a new `ReadinessQueue`.
2157    fn new() -> io::Result<ReadinessQueue> {
2158        is_send::<Self>();
2159        is_sync::<Self>();
2160
2161        let end_marker = Box::new(ReadinessNode::marker());
2162        let sleep_marker = Box::new(ReadinessNode::marker());
2163        let closed_marker = Box::new(ReadinessNode::marker());
2164
2165        let ptr = &*end_marker as *const _ as *mut _;
2166
2167        Ok(ReadinessQueue {
2168            inner: Arc::new(ReadinessQueueInner {
2169                awakener: sys::Awakener::new()?,
2170                head_readiness: AtomicPtr::new(ptr),
2171                tail_readiness: UnsafeCell::new(ptr),
2172                end_marker,
2173                sleep_marker,
2174                closed_marker,
2175            }),
2176        })
2177    }
2178
2179    /// Poll the queue for new events
2180    fn poll(&self, dst: &mut sys::Events) {
2181        // `until` is set with the first node that gets re-enqueued due to being
2182        // set to have level-triggered notifications. This prevents an infinite
2183        // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2184        let mut until = ptr::null_mut();
2185
2186        if dst.len() == dst.capacity() {
2187            // If `dst` is already full, the readiness queue won't be drained.
2188            // This might result in `sleep_marker` staying in the queue and
2189            // unecessary pipe writes occuring.
2190            self.inner.clear_sleep_marker();
2191        }
2192
2193        'outer: while dst.len() < dst.capacity() {
2194            // Dequeue a node. If the queue is in an inconsistent state, then
2195            // stop polling. `Poll::poll` will be called again shortly and enter
2196            // a syscall, which should be enough to enable the other thread to
2197            // finish the queuing process.
2198            let ptr = match unsafe { self.inner.dequeue_node(until) } {
2199                Dequeue::Empty | Dequeue::Inconsistent => break,
2200                Dequeue::Data(ptr) => ptr,
2201            };
2202
2203            let node = unsafe { &*ptr };
2204
2205            // Read the node state with Acquire ordering. This allows reading
2206            // the token variables.
2207            let mut state = node.state.load(Acquire);
2208            let mut next;
2209            let mut readiness;
2210            let mut opt;
2211
2212            loop {
2213                // Build up any changes to the readiness node's state and
2214                // attempt the CAS at the end
2215                next = state;
2216
2217                // Given that the node was just read from the queue, the
2218                // `queued` flag should still be set.
2219                debug_assert!(state.is_queued());
2220
2221                // The dropped flag means we need to release the node and
2222                // perform no further processing on it.
2223                if state.is_dropped() {
2224                    // Release the node and continue
2225                    release_node(ptr);
2226                    continue 'outer;
2227                }
2228
2229                // Process the node
2230                readiness = state.effective_readiness();
2231                opt = state.poll_opt();
2232
2233                if opt.is_edge() {
2234                    // Mark the node as dequeued
2235                    next.set_dequeued();
2236
2237                    if opt.is_oneshot() && !readiness.is_empty() {
2238                        next.disarm();
2239                    }
2240                } else if readiness.is_empty() {
2241                    next.set_dequeued();
2242                }
2243
2244                // Ensure `token_read_pos` is set to `token_write_pos` so that
2245                // we read the most up to date token value.
2246                next.update_token_read_pos();
2247
2248                if state == next {
2249                    break;
2250                }
2251
2252                let actual = node.state.compare_and_swap(state, next, AcqRel);
2253
2254                if actual == state {
2255                    break;
2256                }
2257
2258                state = actual;
2259            }
2260
2261            // If the queued flag is still set, then the node must be requeued.
2262            // This typically happens when using level-triggered notifications.
2263            if next.is_queued() {
2264                if until.is_null() {
2265                    // We never want to see the node again
2266                    until = ptr;
2267                }
2268
2269                // Requeue the node
2270                self.inner.enqueue_node(node);
2271            }
2272
2273            if !readiness.is_empty() {
2274                // Get the token
2275                let token = unsafe { token(node, next.token_read_pos()) };
2276
2277                // Push the event
2278                dst.push_event(Event::new(readiness, token));
2279            }
2280        }
2281    }
2282
2283    /// Prepare the queue for the `Poll::poll` thread to block in the system
2284    /// selector. This involves changing `head_readiness` to `sleep_marker`.
2285    /// Returns true if successful and `poll` can block.
2286    fn prepare_for_sleep(&self) -> bool {
2287        let end_marker = self.inner.end_marker();
2288        let sleep_marker = self.inner.sleep_marker();
2289
2290        let tail = unsafe { *self.inner.tail_readiness.get() };
2291
2292        // If the tail is currently set to the sleep_marker, then check if the
2293        // head is as well. If it is, then the queue is currently ready to
2294        // sleep. If it is not, then the queue is not empty and there should be
2295        // no sleeping.
2296        if tail == sleep_marker {
2297            return self.inner.head_readiness.load(Acquire) == sleep_marker;
2298        }
2299
2300        // If the tail is not currently set to `end_marker`, then the queue is
2301        // not empty.
2302        if tail != end_marker {
2303            return false;
2304        }
2305
2306        // The sleep marker is *not* currently in the readiness queue.
2307        //
2308        // The sleep marker is only inserted in this function. It is also only
2309        // inserted in the tail position. This is guaranteed by first checking
2310        // that the end marker is in the tail position, pushing the sleep marker
2311        // after the end marker, then removing the end marker.
2312        //
2313        // Before inserting a node into the queue, the next pointer has to be
2314        // set to null. Again, this is only safe to do when the node is not
2315        // currently in the queue, but we already have ensured this.
2316        self.inner
2317            .sleep_marker
2318            .next_readiness
2319            .store(ptr::null_mut(), Relaxed);
2320
2321        let actual = self
2322            .inner
2323            .head_readiness
2324            .compare_and_swap(end_marker, sleep_marker, AcqRel);
2325
2326        debug_assert!(actual != sleep_marker);
2327
2328        if actual != end_marker {
2329            // The readiness queue is not empty
2330            return false;
2331        }
2332
2333        // The current tail should be pointing to `end_marker`
2334        debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2335        // The `end_marker` next pointer should be null
2336        debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2337
2338        // Update tail pointer.
2339        unsafe {
2340            *self.inner.tail_readiness.get() = sleep_marker;
2341        }
2342        true
2343    }
2344}
2345
2346impl Drop for ReadinessQueue {
2347    fn drop(&mut self) {
2348        // Close the queue by enqueuing the closed node
2349        self.inner.enqueue_node(&self.inner.closed_marker);
2350
2351        loop {
2352            // Free any nodes that happen to be left in the readiness queue
2353            let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2354                Dequeue::Empty => break,
2355                Dequeue::Inconsistent => {
2356                    // This really shouldn't be possible as all other handles to
2357                    // `ReadinessQueueInner` are dropped, but handle this by
2358                    // spinning I guess?
2359                    continue;
2360                }
2361                Dequeue::Data(ptr) => ptr,
2362            };
2363
2364            let node = unsafe { &*ptr };
2365
2366            let state = node.state.load(Acquire);
2367
2368            debug_assert!(state.is_queued());
2369
2370            release_node(ptr);
2371        }
2372    }
2373}
2374
2375impl ReadinessQueueInner {
2376    fn wakeup(&self) -> io::Result<()> {
2377        self.awakener.wakeup()
2378    }
2379
2380    /// Prepend the given node to the head of the readiness queue. This is done
2381    /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2382    fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2383        if self.enqueue_node(node) {
2384            self.wakeup()?;
2385        }
2386
2387        Ok(())
2388    }
2389
2390    /// Push the node into the readiness queue
2391    fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2392        // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2393        let node_ptr = node as *const _ as *mut _;
2394
2395        // Relaxed used as the ordering is "released" when swapping
2396        // `head_readiness`
2397        node.next_readiness.store(ptr::null_mut(), Relaxed);
2398
2399        unsafe {
2400            let mut prev = self.head_readiness.load(Acquire);
2401
2402            loop {
2403                if prev == self.closed_marker() {
2404                    debug_assert!(node_ptr != self.closed_marker());
2405                    // debug_assert!(node_ptr != self.end_marker());
2406                    debug_assert!(node_ptr != self.sleep_marker());
2407
2408                    if node_ptr != self.end_marker() {
2409                        // The readiness queue is shutdown, but the enqueue flag was
2410                        // set. This means that we are responsible for decrementing
2411                        // the ready queue's ref count
2412                        debug_assert!(node.ref_count.load(Relaxed) >= 2);
2413                        release_node(node_ptr);
2414                    }
2415
2416                    return false;
2417                }
2418
2419                let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2420
2421                if prev == act {
2422                    break;
2423                }
2424
2425                prev = act;
2426            }
2427
2428            debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2429
2430            (*prev).next_readiness.store(node_ptr, Release);
2431
2432            prev == self.sleep_marker()
2433        }
2434    }
2435
2436    fn clear_sleep_marker(&self) {
2437        let end_marker = self.end_marker();
2438        let sleep_marker = self.sleep_marker();
2439
2440        unsafe {
2441            let tail = *self.tail_readiness.get();
2442
2443            if tail != self.sleep_marker() {
2444                return;
2445            }
2446
2447            // The empty markeer is *not* currently in the readiness queue
2448            // (since the sleep markeris).
2449            self.end_marker
2450                .next_readiness
2451                .store(ptr::null_mut(), Relaxed);
2452
2453            let actual = self
2454                .head_readiness
2455                .compare_and_swap(sleep_marker, end_marker, AcqRel);
2456
2457            debug_assert!(actual != end_marker);
2458
2459            if actual != sleep_marker {
2460                // The readiness queue is not empty, we cannot remove the sleep
2461                // markeer
2462                return;
2463            }
2464
2465            // Update the tail pointer.
2466            *self.tail_readiness.get() = end_marker;
2467        }
2468    }
2469
2470    /// Must only be called in `poll` or `drop`
2471    unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2472        // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2473        // with the modifications mentioned at the top of the file.
2474        let mut tail = *self.tail_readiness.get();
2475        let mut next = (*tail).next_readiness.load(Acquire);
2476
2477        if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker()
2478        {
2479            if next.is_null() {
2480                // Make sure the sleep marker is removed (as we are no longer
2481                // sleeping
2482                self.clear_sleep_marker();
2483
2484                return Dequeue::Empty;
2485            }
2486
2487            *self.tail_readiness.get() = next;
2488            tail = next;
2489            next = (*next).next_readiness.load(Acquire);
2490        }
2491
2492        // Only need to check `until` at this point. `until` is either null,
2493        // which will never match tail OR it is a node that was pushed by
2494        // the current thread. This means that either:
2495        //
2496        // 1) The queue is inconsistent, which is handled explicitly
2497        // 2) We encounter `until` at this point in dequeue
2498        // 3) we will pop a different node
2499        if tail == until {
2500            return Dequeue::Empty;
2501        }
2502
2503        if !next.is_null() {
2504            *self.tail_readiness.get() = next;
2505            return Dequeue::Data(tail);
2506        }
2507
2508        if self.head_readiness.load(Acquire) != tail {
2509            return Dequeue::Inconsistent;
2510        }
2511
2512        // Push the stub node
2513        self.enqueue_node(&self.end_marker);
2514
2515        next = (*tail).next_readiness.load(Acquire);
2516
2517        if !next.is_null() {
2518            *self.tail_readiness.get() = next;
2519            return Dequeue::Data(tail);
2520        }
2521
2522        Dequeue::Inconsistent
2523    }
2524
2525    fn end_marker(&self) -> *mut ReadinessNode {
2526        &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2527    }
2528
2529    fn sleep_marker(&self) -> *mut ReadinessNode {
2530        &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2531    }
2532
2533    fn closed_marker(&self) -> *mut ReadinessNode {
2534        &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2535    }
2536}
2537
2538impl ReadinessNode {
2539    /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2540    fn new(
2541        queue: *mut (),
2542        token: Token,
2543        interest: Ready,
2544        opt: PollOpt,
2545        ref_count: usize,
2546    ) -> ReadinessNode {
2547        ReadinessNode {
2548            state: AtomicState::new(interest, opt),
2549            // Only the first token is set, the others are initialized to 0
2550            token_0: UnsafeCell::new(token),
2551            token_1: UnsafeCell::new(Token(0)),
2552            token_2: UnsafeCell::new(Token(0)),
2553            next_readiness: AtomicPtr::new(ptr::null_mut()),
2554            update_lock: AtomicBool::new(false),
2555            readiness_queue: AtomicPtr::new(queue),
2556            ref_count: AtomicUsize::new(ref_count),
2557        }
2558    }
2559
2560    fn marker() -> ReadinessNode {
2561        ReadinessNode {
2562            state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2563            token_0: UnsafeCell::new(Token(0)),
2564            token_1: UnsafeCell::new(Token(0)),
2565            token_2: UnsafeCell::new(Token(0)),
2566            next_readiness: AtomicPtr::new(ptr::null_mut()),
2567            update_lock: AtomicBool::new(false),
2568            readiness_queue: AtomicPtr::new(ptr::null_mut()),
2569            ref_count: AtomicUsize::new(0),
2570        }
2571    }
2572
2573    fn enqueue_with_wakeup(&self) -> io::Result<()> {
2574        let queue = self.readiness_queue.load(Acquire);
2575
2576        if queue.is_null() {
2577            // Not associated with a queue, nothing to do
2578            return Ok(());
2579        }
2580
2581        enqueue_with_wakeup(queue, self)
2582    }
2583}
2584
2585fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2586    debug_assert!(!queue.is_null());
2587    // This is ugly... but we don't want to bump the ref count.
2588    let queue: &Arc<ReadinessQueueInner> =
2589        unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2590    queue.enqueue_node_with_wakeup(node)
2591}
2592
2593unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2594    match pos {
2595        0 => *node.token_0.get(),
2596        1 => *node.token_1.get(),
2597        2 => *node.token_2.get(),
2598        _ => unreachable!(),
2599    }
2600}
2601
2602fn release_node(ptr: *mut ReadinessNode) {
2603    unsafe {
2604        // `AcqRel` synchronizes with other `release_node` functions and ensures
2605        // that the drop happens after any reads / writes on other threads.
2606        if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2607            return;
2608        }
2609
2610        let node = Box::from_raw(ptr);
2611
2612        // Decrement the readiness_queue Arc
2613        let queue = node.readiness_queue.load(Acquire);
2614
2615        if queue.is_null() {
2616            return;
2617        }
2618
2619        let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2620    }
2621}
2622
2623impl AtomicState {
2624    fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2625        let state = ReadinessState::new(interest, opt);
2626
2627        AtomicState {
2628            inner: AtomicUsize::new(state.into()),
2629        }
2630    }
2631
2632    /// Loads the current `ReadinessState`
2633    fn load(&self, order: Ordering) -> ReadinessState {
2634        self.inner.load(order).into()
2635    }
2636
2637    /// Stores a state if the current state is the same as `current`.
2638    fn compare_and_swap(
2639        &self,
2640        current: ReadinessState,
2641        new: ReadinessState,
2642        order: Ordering,
2643    ) -> ReadinessState {
2644        self.inner
2645            .compare_and_swap(current.into(), new.into(), order)
2646            .into()
2647    }
2648
2649    // Returns `true` if the node should be queued
2650    fn flag_as_dropped(&self) -> bool {
2651        let prev: ReadinessState = self
2652            .inner
2653            .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2654            .into();
2655        // The flag should not have been previously set
2656        debug_assert!(!prev.is_dropped());
2657
2658        !prev.is_queued()
2659    }
2660}
2661
2662impl ReadinessState {
2663    // Create a `ReadinessState` initialized with the provided arguments
2664    #[inline]
2665    fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2666        let interest = event::ready_as_usize(interest);
2667        let opt = event::opt_as_usize(opt);
2668
2669        debug_assert!(interest <= MASK_4);
2670        debug_assert!(opt <= MASK_4);
2671
2672        let mut val = interest << INTEREST_SHIFT;
2673        val |= opt << POLL_OPT_SHIFT;
2674
2675        ReadinessState(val)
2676    }
2677
2678    #[inline]
2679    fn get(self, mask: usize, shift: usize) -> usize {
2680        (self.0 >> shift) & mask
2681    }
2682
2683    #[inline]
2684    fn set(&mut self, val: usize, mask: usize, shift: usize) {
2685        self.0 = (self.0 & !(mask << shift)) | (val << shift)
2686    }
2687
2688    /// Get the readiness
2689    #[inline]
2690    fn readiness(self) -> Ready {
2691        let v = self.get(MASK_4, READINESS_SHIFT);
2692        event::ready_from_usize(v)
2693    }
2694
2695    #[inline]
2696    fn effective_readiness(self) -> Ready {
2697        self.readiness() & self.interest()
2698    }
2699
2700    /// Set the readiness
2701    #[inline]
2702    fn set_readiness(&mut self, v: Ready) {
2703        self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2704    }
2705
2706    /// Get the interest
2707    #[inline]
2708    fn interest(self) -> Ready {
2709        let v = self.get(MASK_4, INTEREST_SHIFT);
2710        event::ready_from_usize(v)
2711    }
2712
2713    /// Set the interest
2714    #[inline]
2715    fn set_interest(&mut self, v: Ready) {
2716        self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2717    }
2718
2719    #[inline]
2720    fn disarm(&mut self) {
2721        self.set_interest(Ready::empty());
2722    }
2723
2724    /// Get the poll options
2725    #[inline]
2726    fn poll_opt(self) -> PollOpt {
2727        let v = self.get(MASK_4, POLL_OPT_SHIFT);
2728        event::opt_from_usize(v)
2729    }
2730
2731    /// Set the poll options
2732    #[inline]
2733    fn set_poll_opt(&mut self, v: PollOpt) {
2734        self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2735    }
2736
2737    #[inline]
2738    fn is_queued(self) -> bool {
2739        self.0 & QUEUED_MASK == QUEUED_MASK
2740    }
2741
2742    /// Set the queued flag
2743    #[inline]
2744    fn set_queued(&mut self) {
2745        // Dropped nodes should never be queued
2746        debug_assert!(!self.is_dropped());
2747        self.0 |= QUEUED_MASK;
2748    }
2749
2750    #[inline]
2751    fn set_dequeued(&mut self) {
2752        debug_assert!(self.is_queued());
2753        self.0 &= !QUEUED_MASK
2754    }
2755
2756    #[inline]
2757    fn is_dropped(self) -> bool {
2758        self.0 & DROPPED_MASK == DROPPED_MASK
2759    }
2760
2761    #[inline]
2762    fn token_read_pos(self) -> usize {
2763        self.get(MASK_2, TOKEN_RD_SHIFT)
2764    }
2765
2766    #[inline]
2767    fn token_write_pos(self) -> usize {
2768        self.get(MASK_2, TOKEN_WR_SHIFT)
2769    }
2770
2771    #[inline]
2772    fn next_token_pos(self) -> usize {
2773        let rd = self.token_read_pos();
2774        let wr = self.token_write_pos();
2775
2776        match wr {
2777            0 => match rd {
2778                1 => 2,
2779                2 => 1,
2780                0 => 1,
2781                _ => unreachable!(),
2782            },
2783            1 => match rd {
2784                0 => 2,
2785                2 => 0,
2786                1 => 2,
2787                _ => unreachable!(),
2788            },
2789            2 => match rd {
2790                0 => 1,
2791                1 => 0,
2792                2 => 0,
2793                _ => unreachable!(),
2794            },
2795            _ => unreachable!(),
2796        }
2797    }
2798
2799    #[inline]
2800    fn set_token_write_pos(&mut self, val: usize) {
2801        self.set(val, MASK_2, TOKEN_WR_SHIFT);
2802    }
2803
2804    #[inline]
2805    fn update_token_read_pos(&mut self) {
2806        let val = self.token_write_pos();
2807        self.set(val, MASK_2, TOKEN_RD_SHIFT);
2808    }
2809}
2810
2811impl From<ReadinessState> for usize {
2812    fn from(src: ReadinessState) -> usize {
2813        src.0
2814    }
2815}
2816
2817impl From<usize> for ReadinessState {
2818    fn from(src: usize) -> ReadinessState {
2819        ReadinessState(src)
2820    }
2821}
2822
2823fn is_send<T: Send>() {}
2824fn is_sync<T: Sync>() {}
2825
2826impl SelectorId {
2827    pub fn new() -> SelectorId {
2828        SelectorId {
2829            id: AtomicUsize::new(0),
2830        }
2831    }
2832
2833    pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2834        let selector_id = self.id.load(Ordering::SeqCst);
2835
2836        if selector_id != 0 && selector_id != poll.selector.id() {
2837            Err(io::Error::new(
2838                io::ErrorKind::Other,
2839                "socket already registered",
2840            ))
2841        } else {
2842            self.id.store(poll.selector.id(), Ordering::SeqCst);
2843            Ok(())
2844        }
2845    }
2846}
2847
2848impl Clone for SelectorId {
2849    fn clone(&self) -> SelectorId {
2850        SelectorId {
2851            id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2852        }
2853    }
2854}
2855
2856#[test]
2857#[cfg(all(unix, not(target_os = "fuchsia")))]
2858pub fn as_raw_fd() {
2859    let poll = Poll::new().unwrap();
2860    assert!(poll.as_raw_fd() > 0);
2861}