futures_net/driver/sys/
poll.rs

1use super::event::{self, Event, Evented, PollOpt, Ready};
2use super::{linux, Token};
3use std::cell::UnsafeCell;
4use std::os::unix::io::AsRawFd;
5use std::os::unix::io::RawFd;
6use std::process;
7use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
8use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
9use std::sync::{Arc, Condvar, Mutex};
10use std::time::{Duration, Instant};
11use std::{fmt, io, ptr, usize};
12use std::{isize, mem, ops};
13
14// Poll is backed by two readiness queues. The first is a system readiness queue
15// represented by `linux::Selector`. The system readiness queue handles events
16// provided by the system, such as TCP and UDP. The second readiness queue is
17// implemented in user space by `ReadinessQueue`. It provides a way to implement
18// purely user space `Evented` types.
19//
20// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
21// list nodes. This significantly reduces the number of required allocations.
22// Each `Registration` / `SetReadiness` pair allocates a single readiness node
23// that is used for the lifetime of the registration.
24//
25// The readiness node also includes a single atomic variable, `state` that
26// tracks most of the state associated with the registration. This includes the
27// current readiness, interest, poll options, and internal state. When the node
28// state is mutated, it is queued in the MPSC channel. A call to
29// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
30// still be mutated while it is queued in the channel for processing.
31// Intermediate state values do not matter as long as the final state is
32// included in the call to `poll`. This is the eventually consistent nature of
33// the readiness queue.
34//
35// The readiness node is ref counted using the `ref_count` field. On creation,
36// the ref_count is initialized to 3: one `Registration` handle, one
37// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
38// doesn't *always* hold a handle to the node, we don't use the Arc type for
39// managing ref counts (this is to avoid constantly incrementing and
40// decrementing the ref count when pushing & popping from the queue). When the
41// `Registration` handle is dropped, the `dropped` flag is set on the node, then
42// the node is pushed into the registration queue. When Poll::poll pops the
43// node, it sees the drop flag is set, and decrements it's ref count.
44//
45// The MPSC queue is a modified version of the intrusive MPSC node based queue
46// described by 1024cores [1].
47//
48// The first modification is that two markers are used instead of a single
49// `stub`. The second marker is a `sleep_marker` which is used to signal to
50// producers that the consumer is going to sleep. This sleep_marker is only used
51// when the queue is empty, implying that the only node in the queue is
52// `end_marker`.
53//
54// The second modification is an `until` argument passed to the dequeue
55// function. When `poll` encounters a level-triggered node, the node will be
56// immediately pushed back into the queue. In order to avoid an infinite loop,
57// `poll` before pushing the node, the pointer is saved off and then passed
58// again as the `until` argument. If the next node to pop is `until`, then
59// `Dequeue::Empty` is returned.
60//
61// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
62
63/// Polls for readiness events on all registered values.
64///
65/// `Poll` allows a program to monitor a large number of `Evented` types,
66/// waiting until one or more become "ready" for some class of operations; e.g.
67/// reading and writing. An `Evented` type is considered ready if it is possible
68/// to immediately perform a corresponding operation; e.g. [`read`] or
69/// [`write`].
70///
71/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
72/// instance using the [`register`] method, supplying readiness interest. The
73/// readiness interest tells `Poll` which specific operations on the handle to
74/// monitor for readiness. A `Token` is also passed to the [`register`]
75/// function. When `Poll` returns a readiness event, it will include this token.
76/// This associates the event with the `Evented` handle that generated the
77/// event.
78///
79/// [`read`]: tcp/struct.TcpStream.html#method.read
80/// [`write`]: tcp/struct.TcpStream.html#method.write
81/// [`register`]: #method.register
82///
83/// # Examples
84///
85/// A basic example -- establishing a `TcpStream` connection.
86///
87/// ```
88/// # use std::error::Error;
89/// # fn try_main() -> Result<(), Box<Error>> {
90/// use futures_net::driver::sys::{Poll, Token};
91/// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
92/// use futures_net::driver::sys::net::TcpStream;
93///
94/// use std::net::{TcpListener, SocketAddr};
95///
96/// // Bind a server socket to connect to.
97/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
98/// let server = TcpListener::bind(&addr)?;
99///
100/// // Construct a new `Poll` handle as well as the `Events` we'll store into
101/// let poll = Poll::new()?;
102/// let mut events = Events::with_capacity(1024);
103///
104/// // Connect the stream
105/// let stream = TcpStream::connect(&server.local_addr()?)?;
106///
107/// // Register the stream with `Poll`
108/// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
109///
110/// // Wait for the socket to become ready. This has to happens in a loop to
111/// // handle spurious wakeups.
112/// loop {
113///     poll.poll(&mut events, None)?;
114///
115///     for event in &events {
116///         if event.token() == Token(0) && event.readiness().is_writable() {
117///             // The socket connected (probably, it could still be a spurious
118///             // wakeup)
119///             return Ok(());
120///         }
121///     }
122/// }
123/// #     Ok(())
124/// # }
125/// #
126/// # fn main() {
127/// #     try_main().unwrap();
128/// # }
129/// ```
130///
131/// # Edge-triggered and level-triggered
132///
133/// An [`Evented`] registration may request edge-triggered events or
134/// level-triggered events. This is done by setting `register`'s
135/// [`PollOpt`] argument to either [`edge`] or [`level`].
136///
137/// The difference between the two can be described as follows. Supposed that
138/// this scenario happens:
139///
140/// 1. A [`TcpStream`] is registered with `Poll`.
141/// 2. The socket receives 2kb of data.
142/// 3. A call to [`Poll::poll`] returns the token associated with the socket
143///    indicating readable readiness.
144/// 4. 1kb is read from the socket.
145/// 5. Another call to [`Poll::poll`] is made.
146///
147/// If when the socket was registered with `Poll`, edge triggered events were
148/// requested, then the call to [`Poll::poll`] done in step **5** will
149/// (probably) hang despite there being another 1kb still present in the socket
150/// read buffer. The reason for this is that edge-triggered mode delivers events
151/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
152/// caller might end up waiting for some data that is already present inside the
153/// socket buffer.
154///
155/// With edge-triggered events, operations **must** be performed on the
156/// `Evented` type until [`WouldBlock`] is returned. In other words, after
157/// receiving an event indicating readiness for a certain operation, one should
158/// assume that [`Poll::poll`] may never return another event for the same token
159/// and readiness until the operation returns [`WouldBlock`].
160///
161/// By contrast, when level-triggered notifications was requested, each call to
162/// [`Poll::poll`] will return an event for the socket as long as data remains
163/// in the socket buffer. Generally, level-triggered events should be avoided if
164/// high performance is a concern.
165///
166/// Since even with edge-triggered events, multiple events can be generated upon
167/// receipt of multiple chunks of data, the caller has the option to set the
168/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
169/// after the event is returned from [`Poll::poll`]. The subsequent calls to
170/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
171/// are disabled even if the readiness state changes. The handle can be
172/// re-enabled by calling [`reregister`]. When handles are disabled, internal
173/// resources used to monitor the handle are maintained until the handle is
174/// dropped or deregistered. This makes re-registering the handle a fast
175/// operation.
176///
177/// For example, in the following scenario:
178///
179/// 1. A [`TcpStream`] is registered with `Poll`.
180/// 2. The socket receives 2kb of data.
181/// 3. A call to [`Poll::poll`] returns the token associated with the socket
182///    indicating readable readiness.
183/// 4. 2kb is read from the socket.
184/// 5. Another call to read is issued and [`WouldBlock`] is returned
185/// 6. The socket receives another 2kb of data.
186/// 7. Another call to [`Poll::poll`] is made.
187///
188/// Assuming the socket was registered with `Poll` with the [`edge`] and
189/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
190/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
191/// returning an event.
192///
193/// In order to receive the event for the data received in step 6, the socket
194/// would need to be reregistered using [`reregister`].
195///
196/// [`PollOpt`]: struct.PollOpt.html
197/// [`edge`]: struct.PollOpt.html#method.edge
198/// [`level`]: struct.PollOpt.html#method.level
199/// [`Poll::poll`]: struct.Poll.html#method.poll
200/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
201/// [`Evented`]: event/trait.Evented.html
202/// [`TcpStream`]: tcp/struct.TcpStream.html
203/// [`reregister`]: #method.reregister
204/// [`oneshot`]: struct.PollOpt.html#method.oneshot
205///
206/// # Portability
207///
208/// Using `Poll` provides a portable interface across supported platforms as
209/// long as the caller takes the following into consideration:
210///
211/// ### Spurious events
212///
213/// [`Poll::poll`] may return readiness events even if the associated
214/// [`Evented`] handle is not actually ready. Given the same code, this may
215/// happen more on some platforms than others. It is important to never assume
216/// that, just because a readiness notification was received, that the
217/// associated operation will succeed as well.
218///
219/// If operation fails with [`WouldBlock`], then the caller should not treat
220/// this as an error, but instead should wait until another readiness event is
221/// received.
222///
223/// ### Draining readiness
224///
225/// When using edge-triggered mode, once a readiness event is received, the
226/// corresponding operation must be performed repeatedly until it returns
227/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
228/// readiness event will be delivered, even if further data is received for the
229/// [`Evented`] handle.
230///
231/// For example, in the first scenario described above, after step 5, even if
232/// the socket receives more data there is no guarantee that another readiness
233/// event will be delivered.
234///
235/// ### Readiness operations
236///
237/// The only readiness operations that are guaranteed to be present on all
238/// supported platforms are [`readable`] and [`writable`]. All other readiness
239/// operations may have false negatives and as such should be considered
240/// **hints**. This means that if a socket is registered with [`readable`],
241/// [`error`], and [`hup`] interest, and either an error or hup is received, a
242/// readiness event will be generated for the socket, but it **may** only
243/// include `readable` readiness. Also note that, given the potential for
244/// spurious events, receiving a readiness event with `hup` or `error` doesn't
245/// actually mean that a `read` on the socket will return a result matching the
246/// readiness event.
247///
248/// In other words, portable programs that explicitly check for [`hup`] or
249/// [`error`] readiness should be doing so as an **optimization** and always be
250/// able to handle an error or HUP situation when performing the actual read
251/// operation.
252///
253/// [`readable`]: struct.Ready.html#method.readable
254/// [`writable`]: struct.Ready.html#method.writable
255/// [`error`]: unix/struct.UnixReady.html#method.error
256/// [`hup`]: unix/struct.UnixReady.html#method.hup
257///
258/// ### Registering handles
259///
260/// Unless otherwise noted, it should be assumed that types implementing
261/// [`Evented`] will never become ready unless they are registered with `Poll`.
262///
263/// For example:
264///
265/// ```
266/// # use std::error::Error;
267/// # fn try_main() -> Result<(), Box<Error>> {
268/// use futures_net::driver::sys::{Poll, Token};
269/// use futures_net::driver::sys::event::{Ready, PollOpt};
270/// use futures_net::driver::sys::net::TcpStream;
271/// use std::time::Duration;
272/// use std::thread;
273///
274/// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275///
276/// thread::sleep(Duration::from_secs(1));
277///
278/// let poll = Poll::new()?;
279///
280/// // The connect is not guaranteed to have started until it is registered at
281/// // this point
282/// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283/// #     Ok(())
284/// # }
285/// #
286/// # fn main() {
287/// #     try_main().unwrap();
288/// # }
289/// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// |      OS    |  Selector |
296/// |------------|-----------|
297/// | Linux      | [epoll]   |
298/// | OS X, iOS  | [kqueue]  |
299/// | Windows    | [IOCP]    |
300/// | FreeBSD    | [kqueue]  |
301/// | Android    | [epoll]   |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model Mio's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
323/// [`EventedFd`]: unix/struct.EventedFd.html
324/// [`SetReadiness`]: struct.SetReadiness.html
325/// [`Poll::poll`]: struct.Poll.html#method.poll
326pub struct Poll {
327    // Platform specific IO selector
328    selector: linux::Selector,
329
330    // Custom readiness queue
331    readiness_queue: ReadinessQueue,
332
333    // Use an atomic to first check if a full lock will be required. This is a
334    // fast-path check for single threaded cases avoiding the extra syscall
335    lock_state: AtomicUsize,
336
337    // Sequences concurrent calls to `Poll::poll`
338    lock: Mutex<()>,
339
340    // Wakeup the next waiter
341    condvar: Condvar,
342}
343
344/// Handle to a user space `Poll` registration.
345///
346/// `Registration` allows implementing [`Evented`] for types that cannot work
347/// with the [system selector]. A `Registration` is always paired with a
348/// `SetReadiness`, which allows updating the registration's readiness state.
349/// When [`set_readiness`] is called and the `Registration` is associated with a
350/// [`Poll`] instance, a readiness event will be created and eventually returned
351/// by [`poll`].
352///
353/// A `Registration` / `SetReadiness` pair is created by calling
354/// [`Registration::new2`]. At this point, the registration is not being
355/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
356/// result in any readiness notifications.
357///
358/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
359/// the same [`register`], [`reregister`], and [`deregister`] functions used
360/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
361/// changes result in readiness events being dispatched to the [`Poll`] instance
362/// with which `Registration` is registered.
363///
364/// **Note**, before using `Registration` be sure to read the
365/// [`set_readiness`] documentation and the [portability] notes. The
366/// guarantees offered by `Registration` may be weaker than expected.
367///
368/// For high level documentation, see [`Poll`].
369///
370/// # Examples
371///
372/// ```
373/// use futures_net::driver::sys::{Registration, Poll, Token};
374/// use futures_net::driver::sys::event::{Evented, Ready, PollOpt};
375///
376/// use std::io;
377/// use std::time::Instant;
378/// use std::thread;
379///
380/// pub struct Deadline {
381///     when: Instant,
382///     registration: Registration,
383/// }
384///
385/// impl Deadline {
386///     pub fn new(when: Instant) -> Deadline {
387///         let (registration, set_readiness) = Registration::new2();
388///
389///         thread::spawn(move || {
390///             let now = Instant::now();
391///
392///             if now < when {
393///                 thread::sleep(when - now);
394///             }
395///
396///             set_readiness.set_readiness(Ready::readable());
397///         });
398///
399///         Deadline {
400///             when: when,
401///             registration: registration,
402///         }
403///     }
404///
405///     pub fn is_elapsed(&self) -> bool {
406///         Instant::now() >= self.when
407///     }
408/// }
409///
410/// impl Evented for Deadline {
411///     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
412///         -> io::Result<()>
413///     {
414///         self.registration.register(poll, token, interest, opts)
415///     }
416///
417///     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
418///         -> io::Result<()>
419///     {
420///         self.registration.reregister(poll, token, interest, opts)
421///     }
422///
423///     fn deregister(&self, poll: &Poll) -> io::Result<()> {
424///         poll.deregister(&self.registration)
425///     }
426/// }
427/// ```
428///
429/// [system selector]: struct.Poll.html#implementation-notes
430/// [`Poll`]: struct.Poll.html
431/// [`Registration::new2`]: struct.Registration.html#method.new2
432/// [`Evented`]: event/trait.Evented.html
433/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
434/// [`register`]: struct.Poll.html#method.register
435/// [`reregister`]: struct.Poll.html#method.reregister
436/// [`deregister`]: struct.Poll.html#method.deregister
437/// [portability]: struct.Poll.html#portability
438pub struct Registration {
439    inner: RegistrationInner,
440}
441
442unsafe impl Send for Registration {}
443unsafe impl Sync for Registration {}
444
445/// Updates the readiness state of the associated `Registration`.
446///
447/// See [`Registration`] for more documentation on using `SetReadiness` and
448/// [`Poll`] for high level polling documentation.
449///
450/// [`Poll`]: struct.Poll.html
451/// [`Registration`]: struct.Registration.html
452#[derive(Clone)]
453pub struct SetReadiness {
454    inner: RegistrationInner,
455}
456
457unsafe impl Send for SetReadiness {}
458unsafe impl Sync for SetReadiness {}
459
460/// Used to associate an IO type with a Selector
461#[derive(Debug)]
462pub struct SelectorId {
463    id: AtomicUsize,
464}
465
466struct RegistrationInner {
467    // Unsafe pointer to the registration's node. The node is ref counted. This
468    // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
469    // handle though it isn't stored anywhere. In other words, `Poll::poll`
470    // needs to decrement the ref count before the node is freed.
471    node: *mut ReadinessNode,
472}
473
474#[derive(Clone)]
475struct ReadinessQueue {
476    inner: Arc<ReadinessQueueInner>,
477}
478
479unsafe impl Send for ReadinessQueue {}
480unsafe impl Sync for ReadinessQueue {}
481
482struct ReadinessQueueInner {
483    // Used to wake up `Poll` when readiness is set in another thread.
484    awakener: linux::Awakener,
485
486    // Head of the MPSC queue used to signal readiness to `Poll::poll`.
487    head_readiness: AtomicPtr<ReadinessNode>,
488
489    // Tail of the readiness queue.
490    //
491    // Only accessed by Poll::poll. Coordination will be handled by the poll fn
492    tail_readiness: UnsafeCell<*mut ReadinessNode>,
493
494    // Fake readiness node used to punctuate the end of the readiness queue.
495    // Before attempting to read from the queue, this node is inserted in order
496    // to partition the queue between nodes that are "owned" by the dequeue end
497    // and nodes that will be pushed on by producers.
498    end_marker: Box<ReadinessNode>,
499
500    // Similar to `end_marker`, but this node signals to producers that `Poll`
501    // has gone to sleep and must be woken up.
502    sleep_marker: Box<ReadinessNode>,
503
504    // Similar to `end_marker`, but the node signals that the queue is closed.
505    // This happens when `ReadyQueue` is dropped and signals to producers that
506    // the nodes should no longer be pushed into the queue.
507    closed_marker: Box<ReadinessNode>,
508}
509
510/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
511/// queued into the MPSC channel.
512struct ReadinessNode {
513    // Node state, see struct docs for `ReadinessState`
514    //
515    // This variable is the primary point of coordination between all the
516    // various threads concurrently accessing the node.
517    state: AtomicState,
518
519    // The registration token cannot fit into the `state` variable, so it is
520    // broken out here. In order to atomically update both the state and token
521    // we have to jump through a few hoops.
522    //
523    // First, `state` includes `token_read_pos` and `token_write_pos`. These can
524    // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
525    // the token slot that contains the most up to date registration token.
526    // `token_read_pos` is the token slot that `poll` is currently reading from.
527    //
528    // When a call to `update` includes a different token than the one currently
529    // associated with the registration (token_write_pos), first an unused token
530    // slot is found. The unused slot is the one not represented by
531    // `token_read_pos` OR `token_write_pos`. The new token is written to this
532    // slot, then `state` is updated with the new `token_write_pos` value. This
533    // requires that there is only a *single* concurrent call to `update`.
534    //
535    // When `poll` reads a node state, it checks that `token_read_pos` matches
536    // `token_write_pos`. If they do not match, then it atomically updates
537    // `state` such that `token_read_pos` is set to `token_write_pos`. It will
538    // then read the token at the newly updated `token_read_pos`.
539    token_0: UnsafeCell<Token>,
540    token_1: UnsafeCell<Token>,
541    token_2: UnsafeCell<Token>,
542
543    // Used when the node is queued in the readiness linked list. Accessing
544    // this field requires winning the "queue" lock
545    next_readiness: AtomicPtr<ReadinessNode>,
546
547    // Ensures that there is only one concurrent call to `update`.
548    //
549    // Each call to `update` will attempt to swap `update_lock` from `false` to
550    // `true`. If the CAS succeeds, the thread has obtained the update lock. If
551    // the CAS fails, then the `update` call returns immediately and the update
552    // is discarded.
553    update_lock: AtomicBool,
554
555    // Pointer to Arc<ReadinessQueueInner>
556    readiness_queue: AtomicPtr<()>,
557
558    // Tracks the number of `ReadyRef` pointers
559    ref_count: AtomicUsize,
560}
561
562/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
563/// atomic variable handles encoding / decoding `ReadinessState` values.
564struct AtomicState {
565    inner: AtomicUsize,
566}
567
568const MASK_2: usize = 4 - 1;
569const MASK_4: usize = 16 - 1;
570const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
571const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
572
573const READINESS_SHIFT: usize = 0;
574const INTEREST_SHIFT: usize = 4;
575const POLL_OPT_SHIFT: usize = 8;
576const TOKEN_RD_SHIFT: usize = 12;
577const TOKEN_WR_SHIFT: usize = 14;
578const QUEUED_SHIFT: usize = 16;
579const DROPPED_SHIFT: usize = 17;
580
581/// Tracks all state for a single `ReadinessNode`. The state is packed into a
582/// `usize` variable from low to high bit as follows:
583///
584/// 4 bits: Registration current readiness
585/// 4 bits: Registration interest
586/// 4 bits: Poll options
587/// 2 bits: Token position currently being read from by `poll`
588/// 2 bits: Token position last written to by `update`
589/// 1 bit:  Queued flag, set when node is being pushed into MPSC queue.
590/// 1 bit:  Dropped flag, set when all `Registration` handles have been dropped.
591#[derive(Debug, Copy, Clone, Eq, PartialEq)]
592struct ReadinessState(usize);
593
594/// Returned by `dequeue_node`. Represents the different states as described by
595/// the queue documentation on 1024cores.net.
596enum Dequeue {
597    Data(*mut ReadinessNode),
598    Empty,
599    Inconsistent,
600}
601
602const AWAKEN: Token = Token(usize::MAX);
603const MAX_REFCOUNT: usize = (isize::MAX) as usize;
604
605/*
606 *
607 * ===== Poll =====
608 *
609 */
610
611impl Poll {
612    /// Return a new `Poll` handle.
613    ///
614    /// This function will make a syscall to the operating system to create the
615    /// system selector. If this syscall fails, `Poll::new` will return with the
616    /// error.
617    ///
618    /// See [struct] level docs for more details.
619    ///
620    /// [struct]: struct.Poll.html
621    ///
622    /// # Examples
623    ///
624    /// ```
625    /// # use std::error::Error;
626    /// # fn try_main() -> Result<(), Box<Error>> {
627    /// use futures_net::driver::sys::event::Events ;
628    /// use futures_net::driver::sys::Poll;
629    /// use std::time::Duration;
630    ///
631    /// let poll = match Poll::new() {
632    ///     Ok(poll) => poll,
633    ///     Err(e) => panic!("failed to create Poll instance; err={:?}", e),
634    /// };
635    ///
636    /// // Create a structure to receive polled events
637    /// let mut events = Events::with_capacity(1024);
638    ///
639    /// // Wait for events, but none will be received because no `Evented`
640    /// // handles have been registered with this `Poll` instance.
641    /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
642    /// assert_eq!(n, 0);
643    /// #     Ok(())
644    /// # }
645    /// #
646    /// # fn main() {
647    /// #     try_main().unwrap();
648    /// # }
649    /// ```
650    pub fn new() -> io::Result<Poll> {
651        is_send::<Poll>();
652        is_sync::<Poll>();
653
654        let poll = Poll {
655            selector: linux::Selector::new()?,
656            readiness_queue: ReadinessQueue::new()?,
657            lock_state: AtomicUsize::new(0),
658            lock: Mutex::new(()),
659            condvar: Condvar::new(),
660        };
661
662        // Register the notification wakeup FD with the IO poller
663        poll.readiness_queue.inner.awakener.register(
664            &poll,
665            AWAKEN,
666            Ready::readable(),
667            PollOpt::edge(),
668        )?;
669
670        Ok(poll)
671    }
672
673    /// Register an `Evented` handle with the `Poll` instance.
674    ///
675    /// Once registered, the `Poll` instance will monitor the `Evented` handle
676    /// for readiness state changes. When it notices a state change, it will
677    /// return a readiness event for the handle the next time [`poll`] is
678    /// called.
679    ///
680    /// See the [`struct`] docs for a high level overview.
681    ///
682    /// # Arguments
683    ///
684    /// `handle: &E: Evented`: This is the handle that the `Poll` instance
685    /// should monitor for readiness state changes.
686    ///
687    /// `token: Token`: The caller picks a token to associate with the socket.
688    /// When [`poll`] returns an event for the handle, this token is included.
689    /// This allows the caller to map the event to its handle. The token
690    /// associated with the `Evented` handle can be changed at any time by
691    /// calling [`reregister`].
692    ///
693    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
694    /// usage.
695    ///
696    /// See documentation on [`Token`] for an example showing how to pick
697    /// [`Token`] values.
698    ///
699    /// `interest: Ready`: Specifies which operations `Poll` should monitor for
700    /// readiness. `Poll` will only return readiness events for operations
701    /// specified by this argument.
702    ///
703    /// If a socket is registered with readable interest and the socket becomes
704    /// writable, no event will be returned from [`poll`].
705    ///
706    /// The readiness interest for an `Evented` handle can be changed at any
707    /// time by calling [`reregister`].
708    ///
709    /// `opts: PollOpt`: Specifies the registration options. The most common
710    /// options being [`level`] for level-triggered events, [`edge`] for
711    /// edge-triggered events, and [`oneshot`].
712    ///
713    /// The registration options for an `Evented` handle can be changed at any
714    /// time by calling [`reregister`].
715    ///
716    /// # Notes
717    ///
718    /// Unless otherwise specified, the caller should assume that once an
719    /// `Evented` handle is registered with a `Poll` instance, it is bound to
720    /// that `Poll` instance for the lifetime of the `Evented` handle. This
721    /// remains true even if the `Evented` handle is deregistered from the poll
722    /// instance using [`deregister`].
723    ///
724    /// This function is **thread safe**. It can be called concurrently from
725    /// multiple threads.
726    ///
727    /// [`struct`]: #
728    /// [`reregister`]: #method.reregister
729    /// [`deregister`]: #method.deregister
730    /// [`poll`]: #method.poll
731    /// [`level`]: struct.PollOpt.html#method.level
732    /// [`edge`]: struct.PollOpt.html#method.edge
733    /// [`oneshot`]: struct.PollOpt.html#method.oneshot
734    /// [`Token`]: struct.Token.html
735    ///
736    /// # Examples
737    ///
738    /// ```
739    /// # use std::error::Error;
740    /// # fn try_main() -> Result<(), Box<Error>> {
741    /// use futures_net::driver::sys::{Poll, Token};
742    /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
743    /// use futures_net::driver::sys::net::TcpStream;
744    /// use std::time::{Duration, Instant};
745    ///
746    /// let poll = Poll::new()?;
747    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748    ///
749    /// // Register the socket with `poll`
750    /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751    ///
752    /// let mut events = Events::with_capacity(1024);
753    /// let start = Instant::now();
754    /// let timeout = Duration::from_millis(500);
755    ///
756    /// loop {
757    ///     let elapsed = start.elapsed();
758    ///
759    ///     if elapsed >= timeout {
760    ///         // Connection timed out
761    ///         return Ok(());
762    ///     }
763    ///
764    ///     let remaining = timeout - elapsed;
765    ///     poll.poll(&mut events, Some(remaining))?;
766    ///
767    ///     for event in &events {
768    ///         if event.token() == Token(0) {
769    ///             // Something (probably) happened on the socket.
770    ///             return Ok(());
771    ///         }
772    ///     }
773    /// }
774    /// #     Ok(())
775    /// # }
776    /// #
777    /// # fn main() {
778    /// #     try_main().unwrap();
779    /// # }
780    /// ```
781    pub fn register<E: ?Sized>(
782        &self,
783        handle: &E,
784        token: Token,
785        interest: Ready,
786        opts: PollOpt,
787    ) -> io::Result<()>
788    where
789        E: Evented,
790    {
791        validate_args(token)?;
792
793        /*
794         * Undefined behavior:
795         * - Reusing a token with a different `Evented` without deregistering
796         * (or closing) the original `Evented`.
797         */
798        log::trace!("registering with poller");
799
800        // Register interests for this socket
801        handle.register(self, token, interest, opts)?;
802
803        Ok(())
804    }
805
806    /// Re-register an `Evented` handle with the `Poll` instance.
807    ///
808    /// Re-registering an `Evented` handle allows changing the details of the
809    /// registration. Specifically, it allows updating the associated `token`,
810    /// `interest`, and `opts` specified in previous `register` and `reregister`
811    /// calls.
812    ///
813    /// The `reregister` arguments fully override the previous values. In other
814    /// words, if a socket is registered with [`readable`] interest and the call
815    /// to `reregister` specifies [`writable`], then read interest is no longer
816    /// requested for the handle.
817    ///
818    /// The `Evented` handle must have previously been registered with this
819    /// instance of `Poll` otherwise the call to `reregister` will return with
820    /// an error.
821    ///
822    /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823    /// usage.
824    ///
825    /// See the [`register`] documentation for details about the function
826    /// arguments and see the [`struct`] docs for a high level overview of
827    /// polling.
828    ///
829    /// # Examples
830    ///
831    /// ```
832    /// # use std::error::Error;
833    /// # fn try_main() -> Result<(), Box<Error>> {
834    /// use futures_net::driver::sys::{Poll, Token};
835    /// use futures_net::driver::sys::event::{Ready, PollOpt};
836    /// use futures_net::driver::sys::net::TcpStream;
837    ///
838    /// let poll = Poll::new()?;
839    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
840    ///
841    /// // Register the socket with `poll`, requesting readable
842    /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
843    ///
844    /// // Reregister the socket specifying a different token and write interest
845    /// // instead. `PollOpt::edge()` must be specified even though that value
846    /// // is not being changed.
847    /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
848    /// #     Ok(())
849    /// # }
850    /// #
851    /// # fn main() {
852    /// #     try_main().unwrap();
853    /// # }
854    /// ```
855    ///
856    /// [`struct`]: #
857    /// [`register`]: #method.register
858    /// [`readable`]: struct.Ready.html#method.readable
859    /// [`writable`]: struct.Ready.html#method.writable
860    pub fn reregister<E: ?Sized>(
861        &self,
862        handle: &E,
863        token: Token,
864        interest: Ready,
865        opts: PollOpt,
866    ) -> io::Result<()>
867    where
868        E: Evented,
869    {
870        validate_args(token)?;
871
872        log::trace!("registering with poller");
873
874        // Register interests for this socket
875        handle.reregister(self, token, interest, opts)?;
876
877        Ok(())
878    }
879
880    /// Deregister an `Evented` handle with the `Poll` instance.
881    ///
882    /// When an `Evented` handle is deregistered, the `Poll` instance will
883    /// no longer monitor it for readiness state changes. Unlike disabling
884    /// handles with oneshot, deregistering clears up any internal resources
885    /// needed to track the handle.
886    ///
887    /// A handle can be passed back to `register` after it has been
888    /// deregistered; however, it must be passed back to the **same** `Poll`
889    /// instance.
890    ///
891    /// `Evented` handles are automatically deregistered when they are dropped.
892    /// It is common to never need to explicitly call `deregister`.
893    ///
894    /// # Examples
895    ///
896    /// ```
897    /// # use std::error::Error;
898    /// # fn try_main() -> Result<(), Box<Error>> {
899    /// use futures_net::driver::sys::{Poll, Token};
900    /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
901    /// use futures_net::driver::sys::net::TcpStream;
902    /// use std::time::Duration;
903    ///
904    /// let poll = Poll::new()?;
905    /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
906    ///
907    /// // Register the socket with `poll`
908    /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
909    ///
910    /// poll.deregister(&socket)?;
911    ///
912    /// let mut events = Events::with_capacity(1024);
913    ///
914    /// // Set a timeout because this poll should never receive any events.
915    /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
916    /// assert_eq!(0, n);
917    /// #     Ok(())
918    /// # }
919    /// #
920    /// # fn main() {
921    /// #     try_main().unwrap();
922    /// # }
923    /// ```
924    pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
925    where
926        E: Evented,
927    {
928        log::trace!("deregistering handle with poller");
929
930        // Deregister interests for this socket
931        handle.deregister(self)?;
932
933        Ok(())
934    }
935
936    /// Wait for readiness events
937    ///
938    /// Blocks the current thread and waits for readiness events for any of the
939    /// `Evented` handles that have been registered with this `Poll` instance.
940    /// The function will block until either at least one readiness event has
941    /// been received or `timeout` has elapsed. A `timeout` of `None` means that
942    /// `poll` will block until a readiness event has been received.
943    ///
944    /// The supplied `events` will be cleared and newly received readiness events
945    /// will be pushed onto the end. At most `events.capacity()` events will be
946    /// returned. If there are further pending readiness events, they will be
947    /// returned on the next call to `poll`.
948    ///
949    /// A single call to `poll` may result in multiple readiness events being
950    /// returned for a single `Evented` handle. For example, if a TCP socket
951    /// becomes both readable and writable, it may be possible for a single
952    /// readiness event to be returned with both [`readable`] and [`writable`]
953    /// readiness **OR** two separate events may be returned, one with
954    /// [`readable`] set and one with [`writable`] set.
955    ///
956    /// Note that the `timeout` will be rounded up to the system clock
957    /// granularity (usually 1ms), and kernel scheduling delays mean that
958    /// the blocking interval may be overrun by a small amount.
959    ///
960    /// `poll` returns the number of readiness events that have been pushed into
961    /// `events` or `Err` when an error has been encountered with the system
962    /// selector.  The value returned is deprecated and will be removed in 0.7.0.
963    /// Accessing the events by index is also deprecated.  Events can be
964    /// inserted by other events triggering, thus making sequential access
965    /// problematic.  Use the iterator API instead.  See [`iter`].
966    ///
967    /// See the [struct] level documentation for a higher level discussion of
968    /// polling.
969    ///
970    /// [`readable`]: struct.Ready.html#method.readable
971    /// [`writable`]: struct.Ready.html#method.writable
972    /// [struct]: #
973    /// [`iter`]: struct.Events.html#method.iter
974    ///
975    /// # Examples
976    ///
977    /// A basic example -- establishing a `TcpStream` connection.
978    ///
979    /// ```
980    /// # use std::error::Error;
981    /// # fn try_main() -> Result<(), Box<Error>> {
982    /// use futures_net::driver::sys::{Poll, Token};
983    /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
984    /// use futures_net::driver::sys::net::TcpStream;
985    ///
986    /// use std::net::{TcpListener, SocketAddr};
987    /// use std::thread;
988    ///
989    /// // Bind a server socket to connect to.
990    /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
991    /// let server = TcpListener::bind(&addr)?;
992    /// let addr = server.local_addr()?.clone();
993    ///
994    /// // Spawn a thread to accept the socket
995    /// thread::spawn(move || {
996    ///     let _ = server.accept();
997    /// });
998    ///
999    /// // Construct a new `Poll` handle as well as the `Events` we'll store into
1000    /// let poll = Poll::new()?;
1001    /// let mut events = Events::with_capacity(1024);
1002    ///
1003    /// // Connect the stream
1004    /// let stream = TcpStream::connect(&addr)?;
1005    ///
1006    /// // Register the stream with `Poll`
1007    /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1008    ///
1009    /// // Wait for the socket to become ready. This has to happens in a loop to
1010    /// // handle spurious wakeups.
1011    /// loop {
1012    ///     poll.poll(&mut events, None)?;
1013    ///
1014    ///     for event in &events {
1015    ///         if event.token() == Token(0) && event.readiness().is_writable() {
1016    ///             // The socket connected (probably, it could still be a spurious
1017    ///             // wakeup)
1018    ///             return Ok(());
1019    ///         }
1020    ///     }
1021    /// }
1022    /// #     Ok(())
1023    /// # }
1024    /// #
1025    /// # fn main() {
1026    /// #     try_main().unwrap();
1027    /// # }
1028    /// ```
1029    ///
1030    /// [struct]: #
1031    pub fn poll(
1032        &self,
1033        events: &mut Events,
1034        timeout: Option<Duration>,
1035    ) -> io::Result<usize> {
1036        self.poll1(events, timeout, false)
1037    }
1038
1039    /// Like `poll`, but may be interrupted by a signal
1040    ///
1041    /// If `poll` is inturrupted while blocking, it will transparently retry the syscall.  If you
1042    /// want to handle signals yourself, however, use `poll_interruptible`.
1043    pub fn poll_interruptible(
1044        &self,
1045        events: &mut Events,
1046        timeout: Option<Duration>,
1047    ) -> io::Result<usize> {
1048        self.poll1(events, timeout, true)
1049    }
1050
1051    fn poll1(
1052        &self,
1053        events: &mut Events,
1054        mut timeout: Option<Duration>,
1055        interruptible: bool,
1056    ) -> io::Result<usize> {
1057        let zero = Some(Duration::from_millis(0));
1058
1059        // At a high level, the synchronization strategy is to acquire access to
1060        // the critical section by transitioning the atomic from unlocked ->
1061        // locked. If the attempt fails, the thread will wait on the condition
1062        // variable.
1063        //
1064        // # Some more detail
1065        //
1066        // The `lock_state` atomic usize combines:
1067        //
1068        // - locked flag, stored in the least significant bit
1069        // - number of waiting threads, stored in the rest of the bits.
1070        //
1071        // When a thread transitions the locked flag from 0 -> 1, it has
1072        // obtained access to the critical section.
1073        //
1074        // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1075        // This is a fast path for the case when there are no concurrent calls
1076        // to poll, which is very common.
1077        //
1078        // On failure, the mutex is locked, and the thread attempts to increment
1079        // the number of waiting threads component of `lock_state`. If this is
1080        // successfully done while the locked flag is set, then the thread can
1081        // wait on the condition variable.
1082        //
1083        // When a thread exits the critical section, it unsets the locked flag.
1084        // If there are any waiters, which is atomically determined while
1085        // unsetting the locked flag, then the condvar is notified.
1086
1087        let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1088
1089        if 0 != curr {
1090            // Enter slower path
1091            let mut lock = self.lock.lock().unwrap();
1092            let mut inc = false;
1093
1094            loop {
1095                if curr & 1 == 0 {
1096                    // The lock is currently free, attempt to grab it
1097                    let mut next = curr | 1;
1098
1099                    if inc {
1100                        // The waiter count has previously been incremented, so
1101                        // decrement it here
1102                        next -= 2;
1103                    }
1104
1105                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1106
1107                    if actual != curr {
1108                        curr = actual;
1109                        continue;
1110                    }
1111
1112                    // Lock acquired, break from the loop
1113                    break;
1114                }
1115
1116                if timeout == zero {
1117                    if inc {
1118                        self.lock_state.fetch_sub(2, SeqCst);
1119                    }
1120
1121                    return Ok(0);
1122                }
1123
1124                // The lock is currently held, so wait for it to become
1125                // free. If the waiter count hasn't been incremented yet, do
1126                // so now
1127                if !inc {
1128                    let next = curr.checked_add(2).expect("overflow");
1129                    let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1130
1131                    if actual != curr {
1132                        curr = actual;
1133                        continue;
1134                    }
1135
1136                    // Track that the waiter count has been incremented for
1137                    // this thread and fall through to the condvar waiting
1138                    inc = true;
1139                }
1140
1141                lock = match timeout {
1142                    Some(to) => {
1143                        let now = Instant::now();
1144
1145                        // Wait to be notified
1146                        let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1147
1148                        // See how much time was elapsed in the wait
1149                        let elapsed = now.elapsed();
1150
1151                        // Update `timeout` to reflect how much time is left to
1152                        // wait.
1153                        if elapsed >= to {
1154                            timeout = zero;
1155                        } else {
1156                            // Update the timeout
1157                            timeout = Some(to - elapsed);
1158                        }
1159
1160                        l
1161                    }
1162                    None => self.condvar.wait(lock).unwrap(),
1163                };
1164
1165                // Reload the state
1166                curr = self.lock_state.load(SeqCst);
1167
1168                // Try to lock again...
1169            }
1170        }
1171
1172        let ret = self.poll2(events, timeout, interruptible);
1173
1174        // Release the lock
1175        if 1 != self.lock_state.fetch_and(!1, Release) {
1176            // Acquire the mutex
1177            let _lock = self.lock.lock().unwrap();
1178
1179            // There is at least one waiting thread, so notify one
1180            self.condvar.notify_one();
1181        }
1182
1183        ret
1184    }
1185
1186    #[inline]
1187    #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
1188    fn poll2(
1189        &self,
1190        events: &mut Events,
1191        mut timeout: Option<Duration>,
1192        interruptible: bool,
1193    ) -> io::Result<usize> {
1194        // Compute the timeout value passed to the system selector. If the
1195        // readiness queue has pending nodes, we still want to poll the system
1196        // selector for new events, but we don't want to block the thread to
1197        // wait for new events.
1198        if timeout == Some(Duration::from_millis(0)) {
1199            // If blocking is not requested, then there is no need to prepare
1200            // the queue for sleep
1201            //
1202            // The sleep_marker should be removed by readiness_queue.poll().
1203        } else if self.readiness_queue.prepare_for_sleep() {
1204            // The readiness queue is empty. The call to `prepare_for_sleep`
1205            // inserts `sleep_marker` into the queue. This signals to any
1206            // threads setting readiness that the `Poll::poll` is going to
1207            // sleep, so the awakener should be used.
1208        } else {
1209            // The readiness queue is not empty, so do not block the thread.
1210            timeout = Some(Duration::from_millis(0));
1211        }
1212
1213        loop {
1214            let now = Instant::now();
1215            // First get selector events
1216            let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1217            match res {
1218                Ok(true) => {
1219                    // Some awakeners require reading from a FD.
1220                    self.readiness_queue.inner.awakener.cleanup();
1221                    break;
1222                }
1223                Ok(false) => break,
1224                Err(ref e)
1225                    if e.kind() == io::ErrorKind::Interrupted && !interruptible =>
1226                {
1227                    // Interrupted by a signal; update timeout if necessary and retry
1228                    if let Some(to) = timeout {
1229                        let elapsed = now.elapsed();
1230                        if elapsed >= to {
1231                            break;
1232                        } else {
1233                            timeout = Some(to - elapsed);
1234                        }
1235                    }
1236                }
1237                Err(e) => return Err(e),
1238            }
1239        }
1240
1241        // Poll custom event queue
1242        self.readiness_queue.poll(&mut events.inner);
1243
1244        // Return number of polled events
1245        Ok(events.inner.len())
1246    }
1247}
1248
1249fn validate_args(token: Token) -> io::Result<()> {
1250    if token == AWAKEN {
1251        return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1252    }
1253
1254    Ok(())
1255}
1256
1257impl fmt::Debug for Poll {
1258    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1259        fmt.debug_struct("Poll").finish()
1260    }
1261}
1262
1263impl AsRawFd for Poll {
1264    fn as_raw_fd(&self) -> RawFd {
1265        self.selector.as_raw_fd()
1266    }
1267}
1268
1269/// A collection of readiness events.
1270///
1271/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1272/// receive any new readiness events received since the last poll. Usually, a
1273/// single `Events` instance is created at the same time as a [`Poll`] and
1274/// reused on each call to [`Poll::poll`].
1275///
1276/// See [`Poll`] for more documentation on polling.
1277///
1278/// # Examples
1279///
1280/// ```
1281/// # use std::error::Error;
1282/// # fn try_main() -> Result<(), Box<Error>> {
1283/// use futures_net::driver::sys::Poll;
1284/// use futures_net::driver::sys::event::Events;
1285/// use std::time::Duration;
1286///
1287/// let mut events = Events::with_capacity(1024);
1288/// let poll = Poll::new()?;
1289///
1290/// assert_eq!(0, events.len());
1291///
1292/// // Register `Evented` handles with `poll`
1293///
1294/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1295///
1296/// for event in &events {
1297///     println!("event={:?}", event);
1298/// }
1299/// #     Ok(())
1300/// # }
1301/// #
1302/// # fn main() {
1303/// #     try_main().unwrap();
1304/// # }
1305/// ```
1306///
1307/// [`Poll::poll`]: struct.Poll.html#method.poll
1308/// [`Poll`]: struct.Poll.html
1309pub struct Events {
1310    inner: linux::Events,
1311}
1312
1313/// [`Events`] iterator.
1314///
1315/// This struct is created by the [`iter`] method on [`Events`].
1316///
1317/// # Examples
1318///
1319/// ```
1320/// # use std::error::Error;
1321/// # fn try_main() -> Result<(), Box<Error>> {
1322/// use futures_net::driver::sys::Poll;
1323/// use futures_net::driver::sys::event::Events;
1324/// use std::time::Duration;
1325///
1326/// let mut events = Events::with_capacity(1024);
1327/// let poll = Poll::new()?;
1328///
1329/// // Register handles with `poll`
1330///
1331/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1332///
1333/// for event in events.iter() {
1334///     println!("event={:?}", event);
1335/// }
1336/// #     Ok(())
1337/// # }
1338/// #
1339/// # fn main() {
1340/// #     try_main().unwrap();
1341/// # }
1342/// ```
1343///
1344/// [`Events`]: struct.Events.html
1345/// [`iter`]: struct.Events.html#method.iter
1346#[derive(Debug, Clone)]
1347pub struct Iter<'a> {
1348    inner: &'a Events,
1349    pos: usize,
1350}
1351
1352/// Owned [`Events`] iterator.
1353///
1354/// This struct is created by the `into_iter` method on [`Events`].
1355///
1356/// # Examples
1357///
1358/// ```
1359/// # use std::error::Error;
1360/// # fn try_main() -> Result<(), Box<Error>> {
1361/// use futures_net::driver::sys::Poll;
1362/// use futures_net::driver::sys::event::Events;
1363/// use std::time::Duration;
1364///
1365/// let mut events = Events::with_capacity(1024);
1366/// let poll = Poll::new()?;
1367///
1368/// // Register handles with `poll`
1369///
1370/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1371///
1372/// for event in events {
1373///     println!("event={:?}", event);
1374/// }
1375/// #     Ok(())
1376/// # }
1377/// #
1378/// # fn main() {
1379/// #     try_main().unwrap();
1380/// # }
1381/// ```
1382/// [`Events`]: struct.Events.html
1383#[derive(Debug)]
1384pub struct IntoIter {
1385    inner: Events,
1386    pos: usize,
1387}
1388
1389impl Events {
1390    /// Return a new `Events` capable of holding up to `capacity` events.
1391    ///
1392    /// # Examples
1393    ///
1394    /// ```
1395    /// use futures_net::driver::sys::event::Events;
1396    ///
1397    /// let events = Events::with_capacity(1024);
1398    ///
1399    /// assert_eq!(1024, events.capacity());
1400    /// ```
1401    pub fn with_capacity(capacity: usize) -> Events {
1402        Events {
1403            inner: linux::Events::with_capacity(capacity),
1404        }
1405    }
1406
1407    #[deprecated(
1408        since = "0.6.10",
1409        note = "Index access removed in favor of iterator only API."
1410    )]
1411    #[doc(hidden)]
1412    pub fn get(&self, idx: usize) -> Option<Event> {
1413        self.inner.get(idx)
1414    }
1415
1416    #[doc(hidden)]
1417    #[deprecated(
1418        since = "0.6.10",
1419        note = "Index access removed in favor of iterator only API."
1420    )]
1421    pub fn len(&self) -> usize {
1422        self.inner.len()
1423    }
1424
1425    /// Returns the number of `Event` values that `self` can hold.
1426    ///
1427    /// ```
1428    /// use futures_net::driver::sys::event::Events;
1429    ///
1430    /// let events = Events::with_capacity(1024);
1431    ///
1432    /// assert_eq!(1024, events.capacity());
1433    /// ```
1434    pub fn capacity(&self) -> usize {
1435        self.inner.capacity()
1436    }
1437
1438    /// Returns `true` if `self` contains no `Event` values.
1439    ///
1440    /// # Examples
1441    ///
1442    /// ```
1443    /// use futures_net::driver::sys::event::Events;
1444    ///
1445    /// let events = Events::with_capacity(1024);
1446    ///
1447    /// assert!(events.is_empty());
1448    /// ```
1449    pub fn is_empty(&self) -> bool {
1450        self.inner.is_empty()
1451    }
1452
1453    /// Returns an iterator over the `Event` values.
1454    ///
1455    /// # Examples
1456    ///
1457    /// ```
1458    /// # use std::error::Error;
1459    /// # fn try_main() -> Result<(), Box<Error>> {
1460    /// use futures_net::driver::sys::Poll;
1461    /// use futures_net::driver::sys::event::Events;
1462    /// use std::time::Duration;
1463    ///
1464    /// let mut events = Events::with_capacity(1024);
1465    /// let poll = Poll::new()?;
1466    ///
1467    /// // Register handles with `poll`
1468    ///
1469    /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1470    ///
1471    /// for event in events.iter() {
1472    ///     println!("event={:?}", event);
1473    /// }
1474    /// #     Ok(())
1475    /// # }
1476    /// #
1477    /// # fn main() {
1478    /// #     try_main().unwrap();
1479    /// # }
1480    /// ```
1481    pub fn iter(&self) -> Iter {
1482        Iter {
1483            inner: self,
1484            pos: 0,
1485        }
1486    }
1487
1488    /// Clearing all `Event` values from container explicitly.
1489    ///
1490    /// # Examples
1491    ///
1492    /// ```
1493    /// # use std::error::Error;
1494    /// # fn try_main() -> Result<(), Box<Error>> {
1495    /// use futures_net::driver::sys::Poll;
1496    /// use futures_net::driver::sys::event::Events;
1497    /// use std::time::Duration;
1498    ///
1499    /// let mut events = Events::with_capacity(1024);
1500    /// let poll = Poll::new()?;
1501    ///
1502    /// // Register handles with `poll`
1503    /// for _ in 0..2 {
1504    ///     events.clear();
1505    ///     poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1506    ///
1507    ///     for event in events.iter() {
1508    ///         println!("event={:?}", event);
1509    ///     }
1510    /// }
1511    /// #     Ok(())
1512    /// # }
1513    /// #
1514    /// # fn main() {
1515    /// #     try_main().unwrap();
1516    /// # }
1517    /// ```
1518    pub fn clear(&mut self) {
1519        self.inner.clear();
1520    }
1521}
1522
1523impl<'a> IntoIterator for &'a Events {
1524    type Item = Event;
1525    type IntoIter = Iter<'a>;
1526
1527    fn into_iter(self) -> Self::IntoIter {
1528        self.iter()
1529    }
1530}
1531
1532impl<'a> Iterator for Iter<'a> {
1533    type Item = Event;
1534
1535    fn next(&mut self) -> Option<Event> {
1536        let ret = self.inner.inner.get(self.pos);
1537        self.pos += 1;
1538        ret
1539    }
1540}
1541
1542impl IntoIterator for Events {
1543    type Item = Event;
1544    type IntoIter = IntoIter;
1545
1546    fn into_iter(self) -> Self::IntoIter {
1547        IntoIter {
1548            inner: self,
1549            pos: 0,
1550        }
1551    }
1552}
1553
1554impl Iterator for IntoIter {
1555    type Item = Event;
1556
1557    fn next(&mut self) -> Option<Event> {
1558        let ret = self.inner.inner.get(self.pos);
1559        self.pos += 1;
1560        ret
1561    }
1562}
1563
1564impl fmt::Debug for Events {
1565    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1566        f.debug_struct("Events")
1567            .field("capacity", &self.capacity())
1568            .finish()
1569    }
1570}
1571
1572// ===== Accessors for internal usage =====
1573
1574pub fn selector(poll: &Poll) -> &linux::Selector {
1575    &poll.selector
1576}
1577
1578/*
1579 *
1580 * ===== Registration =====
1581 *
1582 */
1583
1584// TODO: get rid of this, windows depends on it for now
1585#[allow(dead_code)]
1586pub fn new_registration(
1587    poll: &Poll,
1588    token: Token,
1589    ready: Ready,
1590    opt: PollOpt,
1591) -> (Registration, SetReadiness) {
1592    Registration::new_priv(poll, token, ready, opt)
1593}
1594
1595impl Registration {
1596    /// Create and return a new `Registration` and the associated
1597    /// `SetReadiness`.
1598    ///
1599    /// See [struct] documentation for more detail and [`Poll`]
1600    /// for high level documentation on polling.
1601    ///
1602    /// # Examples
1603    ///
1604    /// ```
1605    /// # use std::error::Error;
1606    /// # fn try_main() -> Result<(), Box<Error>> {
1607    /// use futures_net::driver::sys::{Poll, Registration, Token};
1608    /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
1609    /// use std::thread;
1610    ///
1611    /// let (registration, set_readiness) = Registration::new2();
1612    ///
1613    /// thread::spawn(move || {
1614    ///     use std::time::Duration;
1615    ///     thread::sleep(Duration::from_millis(500));
1616    ///
1617    ///     set_readiness.set_readiness(Ready::readable());
1618    /// });
1619    ///
1620    /// let poll = Poll::new()?;
1621    /// poll.register(&registration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1622    ///
1623    /// let mut events = Events::with_capacity(256);
1624    ///
1625    /// loop {
1626    ///     poll.poll(&mut events, None);
1627    ///
1628    ///     for event in &events {
1629    ///         if event.token() == Token(0) && event.readiness().is_readable() {
1630    ///             return Ok(());
1631    ///         }
1632    ///     }
1633    /// }
1634    /// #     Ok(())
1635    /// # }
1636    /// #
1637    /// # fn main() {
1638    /// #     try_main().unwrap();
1639    /// # }
1640    /// ```
1641    /// [struct]: #
1642    /// [`Poll`]: struct.Poll.html
1643    pub fn new2() -> (Registration, SetReadiness) {
1644        // Allocate the registration node. The new node will have `ref_count`
1645        // set to 2: one SetReadiness, one Registration.
1646        let node = Box::into_raw(Box::new(ReadinessNode::new(
1647            ptr::null_mut(),
1648            Token(0),
1649            Ready::empty(),
1650            PollOpt::empty(),
1651            2,
1652        )));
1653
1654        let registration = Registration {
1655            inner: RegistrationInner { node },
1656        };
1657
1658        let set_readiness = SetReadiness {
1659            inner: RegistrationInner { node },
1660        };
1661
1662        (registration, set_readiness)
1663    }
1664
1665    // TODO: Get rid of this (windows depends on it for now)
1666    fn new_priv(
1667        poll: &Poll,
1668        token: Token,
1669        interest: Ready,
1670        opt: PollOpt,
1671    ) -> (Registration, SetReadiness) {
1672        is_send::<Registration>();
1673        is_sync::<Registration>();
1674        is_send::<SetReadiness>();
1675        is_sync::<SetReadiness>();
1676
1677        // Clone handle to the readiness queue, this bumps the ref count
1678        let queue = poll.readiness_queue.inner.clone();
1679
1680        // Convert to a *mut () pointer
1681        let queue: *mut () = unsafe { mem::transmute(queue) };
1682
1683        // Allocate the registration node. The new node will have `ref_count`
1684        // set to 3: one SetReadiness, one Registration, and one Poll handle.
1685        let node =
1686            Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1687
1688        let registration = Registration {
1689            inner: RegistrationInner { node },
1690        };
1691
1692        let set_readiness = SetReadiness {
1693            inner: RegistrationInner { node },
1694        };
1695
1696        (registration, set_readiness)
1697    }
1698
1699    #[doc(hidden)]
1700    pub fn update(
1701        &self,
1702        poll: &Poll,
1703        token: Token,
1704        interest: Ready,
1705        opts: PollOpt,
1706    ) -> io::Result<()> {
1707        self.inner.update(poll, token, interest, opts)
1708    }
1709}
1710
1711impl Evented for Registration {
1712    fn register(
1713        &self,
1714        poll: &Poll,
1715        token: Token,
1716        interest: Ready,
1717        opts: PollOpt,
1718    ) -> io::Result<()> {
1719        self.inner.update(poll, token, interest, opts)
1720    }
1721
1722    fn reregister(
1723        &self,
1724        poll: &Poll,
1725        token: Token,
1726        interest: Ready,
1727        opts: PollOpt,
1728    ) -> io::Result<()> {
1729        self.inner.update(poll, token, interest, opts)
1730    }
1731
1732    fn deregister(&self, poll: &Poll) -> io::Result<()> {
1733        self.inner
1734            .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1735    }
1736}
1737
1738impl Drop for Registration {
1739    fn drop(&mut self) {
1740        // `flag_as_dropped` toggles the `dropped` flag and notifies
1741        // `Poll::poll` to release its handle (which is just decrementing
1742        // the ref count).
1743        if self.inner.state.flag_as_dropped() {
1744            // Can't do anything if the queuing fails
1745            let _ = self.inner.enqueue_with_wakeup();
1746        }
1747    }
1748}
1749
1750impl fmt::Debug for Registration {
1751    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1752        fmt.debug_struct("Registration").finish()
1753    }
1754}
1755
1756impl SetReadiness {
1757    /// Returns the registration's current readiness.
1758    ///
1759    /// # Note
1760    ///
1761    /// There is no guarantee that `readiness` establishes any sort of memory
1762    /// ordering. Any concurrent data access must be synchronized using another
1763    /// strategy.
1764    ///
1765    /// # Examples
1766    ///
1767    /// ```
1768    /// # use std::error::Error;
1769    /// # fn try_main() -> Result<(), Box<Error>> {
1770    /// use futures_net::driver::sys::Registration;
1771    /// use futures_net::driver::sys::event::Ready;
1772    ///
1773    /// let (registration, set_readiness) = Registration::new2();
1774    ///
1775    /// assert!(set_readiness.readiness().is_empty());
1776    ///
1777    /// set_readiness.set_readiness(Ready::readable())?;
1778    /// assert!(set_readiness.readiness().is_readable());
1779    /// #     Ok(())
1780    /// # }
1781    /// #
1782    /// # fn main() {
1783    /// #     try_main().unwrap();
1784    /// # }
1785    /// ```
1786    pub fn readiness(&self) -> Ready {
1787        self.inner.readiness()
1788    }
1789
1790    /// Set the registration's readiness
1791    ///
1792    /// If the associated `Registration` is registered with a [`Poll`] instance
1793    /// and has requested readiness events that include `ready`, then a future
1794    /// call to [`Poll::poll`] will receive a readiness event representing the
1795    /// readiness state change.
1796    ///
1797    /// # Note
1798    ///
1799    /// There is no guarantee that `readiness` establishes any sort of memory
1800    /// ordering. Any concurrent data access must be synchronized using another
1801    /// strategy.
1802    ///
1803    /// There is also no guarantee as to when the readiness event will be
1804    /// delivered to poll. A best attempt will be made to make the delivery in a
1805    /// "timely" fashion. For example, the following is **not** guaranteed to
1806    /// work:
1807    ///
1808    /// ```
1809    /// # use std::error::Error;
1810    /// # fn try_main() -> Result<(), Box<Error>> {
1811    /// use futures_net::driver::sys::{Poll, Registration, Token};
1812    /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
1813    ///
1814    /// let poll = Poll::new()?;
1815    /// let (registration, set_readiness) = Registration::new2();
1816    ///
1817    /// poll.register(&registration,
1818    ///               Token(0),
1819    ///               Ready::readable(),
1820    ///               PollOpt::edge())?;
1821    ///
1822    /// // Set the readiness, then immediately poll to try to get the readiness
1823    /// // event
1824    /// set_readiness.set_readiness(Ready::readable())?;
1825    ///
1826    /// let mut events = Events::with_capacity(1024);
1827    /// poll.poll(&mut events, None)?;
1828    ///
1829    /// // There is NO guarantee that the following will work. It is possible
1830    /// // that the readiness event will be delivered at a later time.
1831    /// let event = events.get(0).unwrap();
1832    /// assert_eq!(event.token(), Token(0));
1833    /// assert!(event.readiness().is_readable());
1834    /// #     Ok(())
1835    /// # }
1836    /// #
1837    /// # fn main() {
1838    /// #     try_main().unwrap();
1839    /// # }
1840    /// ```
1841    ///
1842    /// # Examples
1843    ///
1844    /// A simple example, for a more elaborate example, see the [`Evented`]
1845    /// documentation.
1846    ///
1847    /// ```
1848    /// # use std::error::Error;
1849    /// # fn try_main() -> Result<(), Box<Error>> {
1850    /// use futures_net::driver::sys::Registration;
1851    /// use futures_net::driver::sys::event::Ready;
1852    ///
1853    /// let (registration, set_readiness) = Registration::new2();
1854    ///
1855    /// assert!(set_readiness.readiness().is_empty());
1856    ///
1857    /// set_readiness.set_readiness(Ready::readable())?;
1858    /// assert!(set_readiness.readiness().is_readable());
1859    /// #     Ok(())
1860    /// # }
1861    /// #
1862    /// # fn main() {
1863    /// #     try_main().unwrap();
1864    /// # }
1865    /// ```
1866    ///
1867    /// [`Registration`]: struct.Registration.html
1868    /// [`Evented`]: event/trait.Evented.html#examples
1869    /// [`Poll`]: struct.Poll.html
1870    /// [`Poll::poll`]: struct.Poll.html#method.poll
1871    pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1872        self.inner.set_readiness(ready)
1873    }
1874}
1875
1876impl fmt::Debug for SetReadiness {
1877    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1878        f.debug_struct("SetReadiness").finish()
1879    }
1880}
1881
1882impl RegistrationInner {
1883    /// Get the registration's readiness.
1884    fn readiness(&self) -> Ready {
1885        self.state.load(Relaxed).readiness()
1886    }
1887
1888    /// Set the registration's readiness.
1889    ///
1890    /// This function can be called concurrently by an arbitrary number of
1891    /// SetReadiness handles.
1892    fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1893        // Load the current atomic state.
1894        let mut state = self.state.load(Acquire);
1895        let mut next;
1896
1897        loop {
1898            next = state;
1899
1900            if state.is_dropped() {
1901                // Node is dropped, no more notifications
1902                return Ok(());
1903            }
1904
1905            // Update the readiness
1906            next.set_readiness(ready);
1907
1908            // If the readiness is not blank, try to obtain permission to
1909            // push the node into the readiness queue.
1910            if !next.effective_readiness().is_empty() {
1911                next.set_queued();
1912            }
1913
1914            let actual = self.state.compare_and_swap(state, next, AcqRel);
1915
1916            if state == actual {
1917                break;
1918            }
1919
1920            state = actual;
1921        }
1922
1923        if !state.is_queued() && next.is_queued() {
1924            // We toggled the queued flag, making us responsible for queuing the
1925            // node in the MPSC readiness queue.
1926            self.enqueue_with_wakeup()?;
1927        }
1928
1929        Ok(())
1930    }
1931
1932    /// Update the registration details associated with the node
1933    fn update(
1934        &self,
1935        poll: &Poll,
1936        token: Token,
1937        interest: Ready,
1938        opt: PollOpt,
1939    ) -> io::Result<()> {
1940        // First, ensure poll instances match
1941        //
1942        // Load the queue pointer, `Relaxed` is sufficient here as only the
1943        // pointer is being operated on. The actual memory is guaranteed to be
1944        // visible the `poll: &Poll` ref passed as an argument to the function.
1945        let mut queue = self.readiness_queue.load(Relaxed);
1946        let other: &*mut () =
1947            unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1948        let other = *other;
1949
1950        debug_assert!(
1951            mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>()
1952        );
1953
1954        if queue.is_null() {
1955            // Attempt to set the queue pointer. `Release` ordering synchronizes
1956            // with `Acquire` in `ensure_with_wakeup`.
1957            let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1958
1959            if actual.is_null() {
1960                // The CAS succeeded, this means that the node's ref count
1961                // should be incremented to reflect that the `poll` function
1962                // effectively owns the node as well.
1963                //
1964                // `Relaxed` ordering used for the same reason as in
1965                // RegistrationInner::clone
1966                self.ref_count.fetch_add(1, Relaxed);
1967
1968                // Note that the `queue` reference stored in our
1969                // `readiness_queue` field is intended to be a strong reference,
1970                // so now that we've successfully claimed the reference we bump
1971                // the refcount here.
1972                //
1973                // Down below in `release_node` when we deallocate this
1974                // `RegistrationInner` is where we'll transmute this back to an
1975                // arc and decrement the reference count.
1976                mem::forget(poll.readiness_queue.clone());
1977            } else {
1978                // The CAS failed, another thread set the queue pointer, so ensure
1979                // that the pointer and `other` match
1980                if actual != other {
1981                    return Err(io::Error::new(
1982                        io::ErrorKind::Other,
1983                        "registration handle associated with another `Poll` instance",
1984                    ));
1985                }
1986            }
1987
1988            queue = other;
1989        } else if queue != other {
1990            return Err(io::Error::new(
1991                io::ErrorKind::Other,
1992                "registration handle associated with another `Poll` instance",
1993            ));
1994        }
1995
1996        unsafe {
1997            let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1998            debug_assert_eq!(queue as usize, *actual);
1999        }
2000
2001        // The `update_lock` atomic is used as a flag ensuring only a single
2002        // thread concurrently enters the `update` critical section. Any
2003        // concurrent calls to update are discarded. If coordinated updates are
2004        // required, the Mio user is responsible for handling that.
2005        //
2006        // Acquire / Release ordering is used on `update_lock` to ensure that
2007        // data access to the `token_*` variables are scoped to the critical
2008        // section.
2009
2010        // Acquire the update lock.
2011        if self.update_lock.compare_and_swap(false, true, Acquire) {
2012            // The lock is already held. Discard the update
2013            return Ok(());
2014        }
2015
2016        // Relaxed ordering is acceptable here as the only memory that needs to
2017        // be visible as part of the update are the `token_*` variables, and
2018        // ordering has already been handled by the `update_lock` access.
2019        let mut state = self.state.load(Relaxed);
2020        let mut next;
2021
2022        // Read the current token, again this memory has been ordered by the
2023        // acquire on `update_lock`.
2024        let curr_token_pos = state.token_write_pos();
2025        let curr_token = unsafe { self::token(self, curr_token_pos) };
2026
2027        let mut next_token_pos = curr_token_pos;
2028
2029        // If the `update` call is changing the token, then compute the next
2030        // available token slot and write the token there.
2031        //
2032        // Note that this computation is happening *outside* of the
2033        // compare-and-swap loop. The update lock ensures that only a single
2034        // thread could be mutating the write_token_position, so the
2035        // `next_token_pos` will never need to be recomputed even if
2036        // `token_read_pos` concurrently changes. This is because
2037        // `token_read_pos` can ONLY concurrently change to the current value of
2038        // `token_write_pos`, so `next_token_pos` will always remain valid.
2039        if token != curr_token {
2040            next_token_pos = state.next_token_pos();
2041
2042            // Update the token
2043            match next_token_pos {
2044                0 => unsafe { *self.token_0.get() = token },
2045                1 => unsafe { *self.token_1.get() = token },
2046                2 => unsafe { *self.token_2.get() = token },
2047                _ => unreachable!(),
2048            }
2049        }
2050
2051        // Now enter the compare-and-swap loop
2052        loop {
2053            next = state;
2054
2055            // The node is only dropped once all `Registration` handles are
2056            // dropped. Only `Registration` can call `update`.
2057            debug_assert!(!state.is_dropped());
2058
2059            // Update the write token position, this will also release the token
2060            // to Poll::poll.
2061            next.set_token_write_pos(next_token_pos);
2062
2063            // Update readiness and poll opts
2064            next.set_interest(interest);
2065            next.set_poll_opt(opt);
2066
2067            // If there is effective readiness, the node will need to be queued
2068            // for processing. This exact behavior is still TBD, so we are
2069            // conservative for now and always fire.
2070            //
2071            // See https://github.com/carllerche/mio/issues/535.
2072            if !next.effective_readiness().is_empty() {
2073                next.set_queued();
2074            }
2075
2076            // compare-and-swap the state values. Only `Release` is needed here.
2077            // The `Release` ensures that `Poll::poll` will see the token
2078            // update and the update function doesn't care about any other
2079            // memory visibility.
2080            let actual = self.state.compare_and_swap(state, next, Release);
2081
2082            if actual == state {
2083                break;
2084            }
2085
2086            // CAS failed, but `curr_token_pos` should not have changed given
2087            // that we still hold the update lock.
2088            debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2089
2090            state = actual;
2091        }
2092
2093        // Release the lock
2094        self.update_lock.store(false, Release);
2095
2096        if !state.is_queued() && next.is_queued() {
2097            // We are responsible for enqueing the node.
2098            enqueue_with_wakeup(queue, self)?;
2099        }
2100
2101        Ok(())
2102    }
2103}
2104
2105impl ops::Deref for RegistrationInner {
2106    type Target = ReadinessNode;
2107
2108    fn deref(&self) -> &ReadinessNode {
2109        unsafe { &*self.node }
2110    }
2111}
2112
2113impl Clone for RegistrationInner {
2114    fn clone(&self) -> RegistrationInner {
2115        // Using a relaxed ordering is alright here, as knowledge of the
2116        // original reference prevents other threads from erroneously deleting
2117        // the object.
2118        //
2119        // As explained in the [Boost documentation][1], Increasing the
2120        // reference counter can always be done with memory_order_relaxed: New
2121        // references to an object can only be formed from an existing
2122        // reference, and passing an existing reference from one thread to
2123        // another must already provide any required synchronization.
2124        //
2125        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2126        let old_size = self.ref_count.fetch_add(1, Relaxed);
2127
2128        // However we need to guard against massive refcounts in case someone
2129        // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2130        // and users will use-after free. We racily saturate to `isize::MAX` on
2131        // the assumption that there aren't ~2 billion threads incrementing
2132        // the reference count at once. This branch will never be taken in
2133        // any realistic program.
2134        //
2135        // We abort because such a program is incredibly degenerate, and we
2136        // don't care to support it.
2137        if old_size & !MAX_REFCOUNT != 0 {
2138            process::abort();
2139        }
2140
2141        RegistrationInner { node: self.node }
2142    }
2143}
2144
2145impl Drop for RegistrationInner {
2146    fn drop(&mut self) {
2147        // Only handles releasing from `Registration` and `SetReadiness`
2148        // handles. Poll has to call this itself.
2149        release_node(self.node);
2150    }
2151}
2152
2153/*
2154 *
2155 * ===== ReadinessQueue =====
2156 *
2157 */
2158
2159impl ReadinessQueue {
2160    /// Create a new `ReadinessQueue`.
2161    fn new() -> io::Result<ReadinessQueue> {
2162        is_send::<Self>();
2163        is_sync::<Self>();
2164
2165        let end_marker = Box::new(ReadinessNode::marker());
2166        let sleep_marker = Box::new(ReadinessNode::marker());
2167        let closed_marker = Box::new(ReadinessNode::marker());
2168
2169        let ptr = &*end_marker as *const _ as *mut _;
2170
2171        Ok(ReadinessQueue {
2172            inner: Arc::new(ReadinessQueueInner {
2173                awakener: linux::Awakener::new()?,
2174                head_readiness: AtomicPtr::new(ptr),
2175                tail_readiness: UnsafeCell::new(ptr),
2176                end_marker,
2177                sleep_marker,
2178                closed_marker,
2179            }),
2180        })
2181    }
2182
2183    /// Poll the queue for new events
2184    fn poll(&self, dst: &mut linux::Events) {
2185        // `until` is set with the first node that gets re-enqueued due to being
2186        // set to have level-triggered notifications. This prevents an infinite
2187        // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2188        let mut until = ptr::null_mut();
2189
2190        if dst.len() == dst.capacity() {
2191            // If `dst` is already full, the readiness queue won't be drained.
2192            // This might result in `sleep_marker` staying in the queue and
2193            // unecessary pipe writes occuring.
2194            self.inner.clear_sleep_marker();
2195        }
2196
2197        'outer: while dst.len() < dst.capacity() {
2198            // Dequeue a node. If the queue is in an inconsistent state, then
2199            // stop polling. `Poll::poll` will be called again shortly and enter
2200            // a syscall, which should be enough to enable the other thread to
2201            // finish the queuing process.
2202            let ptr = match unsafe { self.inner.dequeue_node(until) } {
2203                Dequeue::Empty | Dequeue::Inconsistent => break,
2204                Dequeue::Data(ptr) => ptr,
2205            };
2206
2207            let node = unsafe { &*ptr };
2208
2209            // Read the node state with Acquire ordering. This allows reading
2210            // the token variables.
2211            let mut state = node.state.load(Acquire);
2212            let mut next;
2213            let mut readiness;
2214            let mut opt;
2215
2216            loop {
2217                // Build up any changes to the readiness node's state and
2218                // attempt the CAS at the end
2219                next = state;
2220
2221                // Given that the node was just read from the queue, the
2222                // `queued` flag should still be set.
2223                debug_assert!(state.is_queued());
2224
2225                // The dropped flag means we need to release the node and
2226                // perform no further processing on it.
2227                if state.is_dropped() {
2228                    // Release the node and continue
2229                    release_node(ptr);
2230                    continue 'outer;
2231                }
2232
2233                // Process the node
2234                readiness = state.effective_readiness();
2235                opt = state.poll_opt();
2236
2237                if opt.is_edge() {
2238                    // Mark the node as dequeued
2239                    next.set_dequeued();
2240
2241                    if opt.is_oneshot() && !readiness.is_empty() {
2242                        next.disarm();
2243                    }
2244                } else if readiness.is_empty() {
2245                    next.set_dequeued();
2246                }
2247
2248                // Ensure `token_read_pos` is set to `token_write_pos` so that
2249                // we read the most up to date token value.
2250                next.update_token_read_pos();
2251
2252                if state == next {
2253                    break;
2254                }
2255
2256                let actual = node.state.compare_and_swap(state, next, AcqRel);
2257
2258                if actual == state {
2259                    break;
2260                }
2261
2262                state = actual;
2263            }
2264
2265            // If the queued flag is still set, then the node must be requeued.
2266            // This typically happens when using level-triggered notifications.
2267            if next.is_queued() {
2268                if until.is_null() {
2269                    // We never want to see the node again
2270                    until = ptr;
2271                }
2272
2273                // Requeue the node
2274                self.inner.enqueue_node(node);
2275            }
2276
2277            if !readiness.is_empty() {
2278                // Get the token
2279                let token = unsafe { token(node, next.token_read_pos()) };
2280
2281                // Push the event
2282                dst.push_event(Event::new(readiness, token));
2283            }
2284        }
2285    }
2286
2287    /// Prepare the queue for the `Poll::poll` thread to block in the system
2288    /// selector. This involves changing `head_readiness` to `sleep_marker`.
2289    /// Returns true if successful and `poll` can block.
2290    fn prepare_for_sleep(&self) -> bool {
2291        let end_marker = self.inner.end_marker();
2292        let sleep_marker = self.inner.sleep_marker();
2293
2294        let tail = unsafe { *self.inner.tail_readiness.get() };
2295
2296        // If the tail is currently set to the sleep_marker, then check if the
2297        // head is as well. If it is, then the queue is currently ready to
2298        // sleep. If it is not, then the queue is not empty and there should be
2299        // no sleeping.
2300        if tail == sleep_marker {
2301            return self.inner.head_readiness.load(Acquire) == sleep_marker;
2302        }
2303
2304        // If the tail is not currently set to `end_marker`, then the queue is
2305        // not empty.
2306        if tail != end_marker {
2307            return false;
2308        }
2309
2310        // The sleep marker is *not* currently in the readiness queue.
2311        //
2312        // The sleep marker is only inserted in this function. It is also only
2313        // inserted in the tail position. This is guaranteed by first checking
2314        // that the end marker is in the tail position, pushing the sleep marker
2315        // after the end marker, then removing the end marker.
2316        //
2317        // Before inserting a node into the queue, the next pointer has to be
2318        // set to null. Again, this is only safe to do when the node is not
2319        // currently in the queue, but we already have ensured this.
2320        self.inner
2321            .sleep_marker
2322            .next_readiness
2323            .store(ptr::null_mut(), Relaxed);
2324
2325        let actual =
2326            self.inner
2327                .head_readiness
2328                .compare_and_swap(end_marker, sleep_marker, AcqRel);
2329
2330        debug_assert!(actual != sleep_marker);
2331
2332        if actual != end_marker {
2333            // The readiness queue is not empty
2334            return false;
2335        }
2336
2337        // The current tail should be pointing to `end_marker`
2338        debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2339        // The `end_marker` next pointer should be null
2340        debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2341
2342        // Update tail pointer.
2343        unsafe {
2344            *self.inner.tail_readiness.get() = sleep_marker;
2345        }
2346        true
2347    }
2348}
2349
2350impl Drop for ReadinessQueue {
2351    fn drop(&mut self) {
2352        // Close the queue by enqueuing the closed node
2353        self.inner.enqueue_node(&*self.inner.closed_marker);
2354
2355        loop {
2356            // Free any nodes that happen to be left in the readiness queue
2357            let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2358                Dequeue::Empty => break,
2359                Dequeue::Inconsistent => {
2360                    // This really shouldn't be possible as all other handles to
2361                    // `ReadinessQueueInner` are dropped, but handle this by
2362                    // spinning I guess?
2363                    continue;
2364                }
2365                Dequeue::Data(ptr) => ptr,
2366            };
2367
2368            let node = unsafe { &*ptr };
2369
2370            let state = node.state.load(Acquire);
2371
2372            debug_assert!(state.is_queued());
2373
2374            release_node(ptr);
2375        }
2376    }
2377}
2378
2379impl ReadinessQueueInner {
2380    fn wakeup(&self) -> io::Result<()> {
2381        self.awakener.wakeup()
2382    }
2383
2384    /// Prepend the given node to the head of the readiness queue. This is done
2385    /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2386    fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2387        if self.enqueue_node(node) {
2388            self.wakeup()?;
2389        }
2390
2391        Ok(())
2392    }
2393
2394    /// Push the node into the readiness queue
2395    fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2396        // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2397        let node_ptr = node as *const _ as *mut _;
2398
2399        // Relaxed used as the ordering is "released" when swapping
2400        // `head_readiness`
2401        node.next_readiness.store(ptr::null_mut(), Relaxed);
2402
2403        unsafe {
2404            let mut prev = self.head_readiness.load(Acquire);
2405
2406            loop {
2407                if prev == self.closed_marker() {
2408                    debug_assert!(node_ptr != self.closed_marker());
2409                    // debug_assert!(node_ptr != self.end_marker());
2410                    debug_assert!(node_ptr != self.sleep_marker());
2411
2412                    if node_ptr != self.end_marker() {
2413                        // The readiness queue is shutdown, but the enqueue flag was
2414                        // set. This means that we are responsible for decrementing
2415                        // the ready queue's ref count
2416                        debug_assert!(node.ref_count.load(Relaxed) >= 2);
2417                        release_node(node_ptr);
2418                    }
2419
2420                    return false;
2421                }
2422
2423                let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2424
2425                if prev == act {
2426                    break;
2427                }
2428
2429                prev = act;
2430            }
2431
2432            debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2433
2434            (*prev).next_readiness.store(node_ptr, Release);
2435
2436            prev == self.sleep_marker()
2437        }
2438    }
2439
2440    fn clear_sleep_marker(&self) {
2441        let end_marker = self.end_marker();
2442        let sleep_marker = self.sleep_marker();
2443
2444        unsafe {
2445            let tail = *self.tail_readiness.get();
2446
2447            if tail != self.sleep_marker() {
2448                return;
2449            }
2450
2451            // The empty markeer is *not* currently in the readiness queue
2452            // (since the sleep markeris).
2453            self.end_marker
2454                .next_readiness
2455                .store(ptr::null_mut(), Relaxed);
2456
2457            let actual =
2458                self.head_readiness
2459                    .compare_and_swap(sleep_marker, end_marker, AcqRel);
2460
2461            debug_assert!(actual != end_marker);
2462
2463            if actual != sleep_marker {
2464                // The readiness queue is not empty, we cannot remove the sleep
2465                // markeer
2466                return;
2467            }
2468
2469            // Update the tail pointer.
2470            *self.tail_readiness.get() = end_marker;
2471        }
2472    }
2473
2474    /// Must only be called in `poll` or `drop`
2475    unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2476        // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2477        // with the modifications mentioned at the top of the file.
2478        let mut tail = *self.tail_readiness.get();
2479        let mut next = (*tail).next_readiness.load(Acquire);
2480
2481        if tail == self.end_marker()
2482            || tail == self.sleep_marker()
2483            || tail == self.closed_marker()
2484        {
2485            if next.is_null() {
2486                // Make sure the sleep marker is removed (as we are no longer
2487                // sleeping
2488                self.clear_sleep_marker();
2489
2490                return Dequeue::Empty;
2491            }
2492
2493            *self.tail_readiness.get() = next;
2494            tail = next;
2495            next = (*next).next_readiness.load(Acquire);
2496        }
2497
2498        // Only need to check `until` at this point. `until` is either null,
2499        // which will never match tail OR it is a node that was pushed by
2500        // the current thread. This means that either:
2501        //
2502        // 1) The queue is inconsistent, which is handled explicitly
2503        // 2) We encounter `until` at this point in dequeue
2504        // 3) we will pop a different node
2505        if tail == until {
2506            return Dequeue::Empty;
2507        }
2508
2509        if !next.is_null() {
2510            *self.tail_readiness.get() = next;
2511            return Dequeue::Data(tail);
2512        }
2513
2514        if self.head_readiness.load(Acquire) != tail {
2515            return Dequeue::Inconsistent;
2516        }
2517
2518        // Push the stub node
2519        self.enqueue_node(&*self.end_marker);
2520
2521        next = (*tail).next_readiness.load(Acquire);
2522
2523        if !next.is_null() {
2524            *self.tail_readiness.get() = next;
2525            return Dequeue::Data(tail);
2526        }
2527
2528        Dequeue::Inconsistent
2529    }
2530
2531    fn end_marker(&self) -> *mut ReadinessNode {
2532        &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2533    }
2534
2535    fn sleep_marker(&self) -> *mut ReadinessNode {
2536        &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2537    }
2538
2539    fn closed_marker(&self) -> *mut ReadinessNode {
2540        &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2541    }
2542}
2543
2544impl ReadinessNode {
2545    /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2546    fn new(
2547        queue: *mut (),
2548        token: Token,
2549        interest: Ready,
2550        opt: PollOpt,
2551        ref_count: usize,
2552    ) -> ReadinessNode {
2553        ReadinessNode {
2554            state: AtomicState::new(interest, opt),
2555            // Only the first token is set, the others are initialized to 0
2556            token_0: UnsafeCell::new(token),
2557            token_1: UnsafeCell::new(Token(0)),
2558            token_2: UnsafeCell::new(Token(0)),
2559            next_readiness: AtomicPtr::new(ptr::null_mut()),
2560            update_lock: AtomicBool::new(false),
2561            readiness_queue: AtomicPtr::new(queue),
2562            ref_count: AtomicUsize::new(ref_count),
2563        }
2564    }
2565
2566    fn marker() -> ReadinessNode {
2567        ReadinessNode {
2568            state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2569            token_0: UnsafeCell::new(Token(0)),
2570            token_1: UnsafeCell::new(Token(0)),
2571            token_2: UnsafeCell::new(Token(0)),
2572            next_readiness: AtomicPtr::new(ptr::null_mut()),
2573            update_lock: AtomicBool::new(false),
2574            readiness_queue: AtomicPtr::new(ptr::null_mut()),
2575            ref_count: AtomicUsize::new(0),
2576        }
2577    }
2578
2579    fn enqueue_with_wakeup(&self) -> io::Result<()> {
2580        let queue = self.readiness_queue.load(Acquire);
2581
2582        if queue.is_null() {
2583            // Not associated with a queue, nothing to do
2584            return Ok(());
2585        }
2586
2587        enqueue_with_wakeup(queue, self)
2588    }
2589}
2590
2591fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2592    debug_assert!(!queue.is_null());
2593    // This is ugly... but we don't want to bump the ref count.
2594    let queue: &Arc<ReadinessQueueInner> =
2595        unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2596    queue.enqueue_node_with_wakeup(node)
2597}
2598
2599unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2600    match pos {
2601        0 => *node.token_0.get(),
2602        1 => *node.token_1.get(),
2603        2 => *node.token_2.get(),
2604        _ => unreachable!(),
2605    }
2606}
2607
2608fn release_node(ptr: *mut ReadinessNode) {
2609    unsafe {
2610        // `AcqRel` synchronizes with other `release_node` functions and ensures
2611        // that the drop happens after any reads / writes on other threads.
2612        if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2613            return;
2614        }
2615
2616        let node = Box::from_raw(ptr);
2617
2618        // Decrement the readiness_queue Arc
2619        let queue = node.readiness_queue.load(Acquire);
2620
2621        if queue.is_null() {
2622            return;
2623        }
2624
2625        let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2626    }
2627}
2628
2629impl AtomicState {
2630    fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2631        let state = ReadinessState::new(interest, opt);
2632
2633        AtomicState {
2634            inner: AtomicUsize::new(state.into()),
2635        }
2636    }
2637
2638    /// Loads the current `ReadinessState`
2639    fn load(&self, order: Ordering) -> ReadinessState {
2640        self.inner.load(order).into()
2641    }
2642
2643    /// Stores a state if the current state is the same as `current`.
2644    fn compare_and_swap(
2645        &self,
2646        current: ReadinessState,
2647        new: ReadinessState,
2648        order: Ordering,
2649    ) -> ReadinessState {
2650        self.inner
2651            .compare_and_swap(current.into(), new.into(), order)
2652            .into()
2653    }
2654
2655    // Returns `true` if the node should be queued
2656    fn flag_as_dropped(&self) -> bool {
2657        let prev: ReadinessState = self
2658            .inner
2659            .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2660            .into();
2661        // The flag should not have been previously set
2662        debug_assert!(!prev.is_dropped());
2663
2664        !prev.is_queued()
2665    }
2666}
2667
2668impl ReadinessState {
2669    // Create a `ReadinessState` initialized with the provided arguments
2670    #[inline]
2671    fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2672        let interest = event::ready_as_usize(interest);
2673        let opt = event::opt_as_usize(opt);
2674
2675        debug_assert!(interest <= MASK_4);
2676        debug_assert!(opt <= MASK_4);
2677
2678        let mut val = interest << INTEREST_SHIFT;
2679        val |= opt << POLL_OPT_SHIFT;
2680
2681        ReadinessState(val)
2682    }
2683
2684    #[inline]
2685    fn get(self, mask: usize, shift: usize) -> usize {
2686        (self.0 >> shift) & mask
2687    }
2688
2689    #[inline]
2690    fn set(&mut self, val: usize, mask: usize, shift: usize) {
2691        self.0 = (self.0 & !(mask << shift)) | (val << shift)
2692    }
2693
2694    /// Get the readiness
2695    #[inline]
2696    fn readiness(self) -> Ready {
2697        let v = self.get(MASK_4, READINESS_SHIFT);
2698        event::ready_from_usize(v)
2699    }
2700
2701    #[inline]
2702    fn effective_readiness(self) -> Ready {
2703        self.readiness() & self.interest()
2704    }
2705
2706    /// Set the readiness
2707    #[inline]
2708    fn set_readiness(&mut self, v: Ready) {
2709        self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2710    }
2711
2712    /// Get the interest
2713    #[inline]
2714    fn interest(self) -> Ready {
2715        let v = self.get(MASK_4, INTEREST_SHIFT);
2716        event::ready_from_usize(v)
2717    }
2718
2719    /// Set the interest
2720    #[inline]
2721    fn set_interest(&mut self, v: Ready) {
2722        self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2723    }
2724
2725    #[inline]
2726    fn disarm(&mut self) {
2727        self.set_interest(Ready::empty());
2728    }
2729
2730    /// Get the poll options
2731    #[inline]
2732    fn poll_opt(self) -> PollOpt {
2733        let v = self.get(MASK_4, POLL_OPT_SHIFT);
2734        event::opt_from_usize(v)
2735    }
2736
2737    /// Set the poll options
2738    #[inline]
2739    fn set_poll_opt(&mut self, v: PollOpt) {
2740        self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2741    }
2742
2743    #[inline]
2744    fn is_queued(self) -> bool {
2745        self.0 & QUEUED_MASK == QUEUED_MASK
2746    }
2747
2748    /// Set the queued flag
2749    #[inline]
2750    fn set_queued(&mut self) {
2751        // Dropped nodes should never be queued
2752        debug_assert!(!self.is_dropped());
2753        self.0 |= QUEUED_MASK;
2754    }
2755
2756    #[inline]
2757    fn set_dequeued(&mut self) {
2758        debug_assert!(self.is_queued());
2759        self.0 &= !QUEUED_MASK
2760    }
2761
2762    #[inline]
2763    fn is_dropped(self) -> bool {
2764        self.0 & DROPPED_MASK == DROPPED_MASK
2765    }
2766
2767    #[inline]
2768    fn token_read_pos(self) -> usize {
2769        self.get(MASK_2, TOKEN_RD_SHIFT)
2770    }
2771
2772    #[inline]
2773    fn token_write_pos(self) -> usize {
2774        self.get(MASK_2, TOKEN_WR_SHIFT)
2775    }
2776
2777    #[inline]
2778    fn next_token_pos(self) -> usize {
2779        let rd = self.token_read_pos();
2780        let wr = self.token_write_pos();
2781
2782        match wr {
2783            0 => match rd {
2784                1 => 2,
2785                2 => 1,
2786                0 => 1,
2787                _ => unreachable!(),
2788            },
2789            1 => match rd {
2790                0 => 2,
2791                2 => 0,
2792                1 => 2,
2793                _ => unreachable!(),
2794            },
2795            2 => match rd {
2796                0 => 1,
2797                1 => 0,
2798                2 => 0,
2799                _ => unreachable!(),
2800            },
2801            _ => unreachable!(),
2802        }
2803    }
2804
2805    #[inline]
2806    fn set_token_write_pos(&mut self, val: usize) {
2807        self.set(val, MASK_2, TOKEN_WR_SHIFT);
2808    }
2809
2810    #[inline]
2811    fn update_token_read_pos(&mut self) {
2812        let val = self.token_write_pos();
2813        self.set(val, MASK_2, TOKEN_RD_SHIFT);
2814    }
2815}
2816
2817impl From<ReadinessState> for usize {
2818    fn from(src: ReadinessState) -> usize {
2819        src.0
2820    }
2821}
2822
2823impl From<usize> for ReadinessState {
2824    fn from(src: usize) -> ReadinessState {
2825        ReadinessState(src)
2826    }
2827}
2828
2829fn is_send<T: Send>() {}
2830fn is_sync<T: Sync>() {}
2831
2832impl SelectorId {
2833    pub fn new() -> SelectorId {
2834        SelectorId {
2835            id: AtomicUsize::new(0),
2836        }
2837    }
2838
2839    pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2840        let selector_id = self.id.load(Ordering::SeqCst);
2841
2842        if selector_id != 0 && selector_id != poll.selector.id() {
2843            Err(io::Error::new(
2844                io::ErrorKind::Other,
2845                "socket already registered",
2846            ))
2847        } else {
2848            self.id.store(poll.selector.id(), Ordering::SeqCst);
2849            Ok(())
2850        }
2851    }
2852}
2853
2854impl Clone for SelectorId {
2855    fn clone(&self) -> SelectorId {
2856        SelectorId {
2857            id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2858        }
2859    }
2860}
2861
2862#[test]
2863pub fn as_raw_fd() {
2864    let poll = Poll::new().unwrap();
2865    assert!(poll.as_raw_fd() > 0);
2866}