futures_net/driver/sys/poll.rs
1use super::event::{self, Event, Evented, PollOpt, Ready};
2use super::{linux, Token};
3use std::cell::UnsafeCell;
4use std::os::unix::io::AsRawFd;
5use std::os::unix::io::RawFd;
6use std::process;
7use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
8use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
9use std::sync::{Arc, Condvar, Mutex};
10use std::time::{Duration, Instant};
11use std::{fmt, io, ptr, usize};
12use std::{isize, mem, ops};
13
14// Poll is backed by two readiness queues. The first is a system readiness queue
15// represented by `linux::Selector`. The system readiness queue handles events
16// provided by the system, such as TCP and UDP. The second readiness queue is
17// implemented in user space by `ReadinessQueue`. It provides a way to implement
18// purely user space `Evented` types.
19//
20// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
21// list nodes. This significantly reduces the number of required allocations.
22// Each `Registration` / `SetReadiness` pair allocates a single readiness node
23// that is used for the lifetime of the registration.
24//
25// The readiness node also includes a single atomic variable, `state` that
26// tracks most of the state associated with the registration. This includes the
27// current readiness, interest, poll options, and internal state. When the node
28// state is mutated, it is queued in the MPSC channel. A call to
29// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
30// still be mutated while it is queued in the channel for processing.
31// Intermediate state values do not matter as long as the final state is
32// included in the call to `poll`. This is the eventually consistent nature of
33// the readiness queue.
34//
35// The readiness node is ref counted using the `ref_count` field. On creation,
36// the ref_count is initialized to 3: one `Registration` handle, one
37// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
38// doesn't *always* hold a handle to the node, we don't use the Arc type for
39// managing ref counts (this is to avoid constantly incrementing and
40// decrementing the ref count when pushing & popping from the queue). When the
41// `Registration` handle is dropped, the `dropped` flag is set on the node, then
42// the node is pushed into the registration queue. When Poll::poll pops the
43// node, it sees the drop flag is set, and decrements it's ref count.
44//
45// The MPSC queue is a modified version of the intrusive MPSC node based queue
46// described by 1024cores [1].
47//
48// The first modification is that two markers are used instead of a single
49// `stub`. The second marker is a `sleep_marker` which is used to signal to
50// producers that the consumer is going to sleep. This sleep_marker is only used
51// when the queue is empty, implying that the only node in the queue is
52// `end_marker`.
53//
54// The second modification is an `until` argument passed to the dequeue
55// function. When `poll` encounters a level-triggered node, the node will be
56// immediately pushed back into the queue. In order to avoid an infinite loop,
57// `poll` before pushing the node, the pointer is saved off and then passed
58// again as the `until` argument. If the next node to pop is `until`, then
59// `Dequeue::Empty` is returned.
60//
61// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
62
63/// Polls for readiness events on all registered values.
64///
65/// `Poll` allows a program to monitor a large number of `Evented` types,
66/// waiting until one or more become "ready" for some class of operations; e.g.
67/// reading and writing. An `Evented` type is considered ready if it is possible
68/// to immediately perform a corresponding operation; e.g. [`read`] or
69/// [`write`].
70///
71/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
72/// instance using the [`register`] method, supplying readiness interest. The
73/// readiness interest tells `Poll` which specific operations on the handle to
74/// monitor for readiness. A `Token` is also passed to the [`register`]
75/// function. When `Poll` returns a readiness event, it will include this token.
76/// This associates the event with the `Evented` handle that generated the
77/// event.
78///
79/// [`read`]: tcp/struct.TcpStream.html#method.read
80/// [`write`]: tcp/struct.TcpStream.html#method.write
81/// [`register`]: #method.register
82///
83/// # Examples
84///
85/// A basic example -- establishing a `TcpStream` connection.
86///
87/// ```
88/// # use std::error::Error;
89/// # fn try_main() -> Result<(), Box<Error>> {
90/// use futures_net::driver::sys::{Poll, Token};
91/// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
92/// use futures_net::driver::sys::net::TcpStream;
93///
94/// use std::net::{TcpListener, SocketAddr};
95///
96/// // Bind a server socket to connect to.
97/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
98/// let server = TcpListener::bind(&addr)?;
99///
100/// // Construct a new `Poll` handle as well as the `Events` we'll store into
101/// let poll = Poll::new()?;
102/// let mut events = Events::with_capacity(1024);
103///
104/// // Connect the stream
105/// let stream = TcpStream::connect(&server.local_addr()?)?;
106///
107/// // Register the stream with `Poll`
108/// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
109///
110/// // Wait for the socket to become ready. This has to happens in a loop to
111/// // handle spurious wakeups.
112/// loop {
113/// poll.poll(&mut events, None)?;
114///
115/// for event in &events {
116/// if event.token() == Token(0) && event.readiness().is_writable() {
117/// // The socket connected (probably, it could still be a spurious
118/// // wakeup)
119/// return Ok(());
120/// }
121/// }
122/// }
123/// # Ok(())
124/// # }
125/// #
126/// # fn main() {
127/// # try_main().unwrap();
128/// # }
129/// ```
130///
131/// # Edge-triggered and level-triggered
132///
133/// An [`Evented`] registration may request edge-triggered events or
134/// level-triggered events. This is done by setting `register`'s
135/// [`PollOpt`] argument to either [`edge`] or [`level`].
136///
137/// The difference between the two can be described as follows. Supposed that
138/// this scenario happens:
139///
140/// 1. A [`TcpStream`] is registered with `Poll`.
141/// 2. The socket receives 2kb of data.
142/// 3. A call to [`Poll::poll`] returns the token associated with the socket
143/// indicating readable readiness.
144/// 4. 1kb is read from the socket.
145/// 5. Another call to [`Poll::poll`] is made.
146///
147/// If when the socket was registered with `Poll`, edge triggered events were
148/// requested, then the call to [`Poll::poll`] done in step **5** will
149/// (probably) hang despite there being another 1kb still present in the socket
150/// read buffer. The reason for this is that edge-triggered mode delivers events
151/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
152/// caller might end up waiting for some data that is already present inside the
153/// socket buffer.
154///
155/// With edge-triggered events, operations **must** be performed on the
156/// `Evented` type until [`WouldBlock`] is returned. In other words, after
157/// receiving an event indicating readiness for a certain operation, one should
158/// assume that [`Poll::poll`] may never return another event for the same token
159/// and readiness until the operation returns [`WouldBlock`].
160///
161/// By contrast, when level-triggered notifications was requested, each call to
162/// [`Poll::poll`] will return an event for the socket as long as data remains
163/// in the socket buffer. Generally, level-triggered events should be avoided if
164/// high performance is a concern.
165///
166/// Since even with edge-triggered events, multiple events can be generated upon
167/// receipt of multiple chunks of data, the caller has the option to set the
168/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
169/// after the event is returned from [`Poll::poll`]. The subsequent calls to
170/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
171/// are disabled even if the readiness state changes. The handle can be
172/// re-enabled by calling [`reregister`]. When handles are disabled, internal
173/// resources used to monitor the handle are maintained until the handle is
174/// dropped or deregistered. This makes re-registering the handle a fast
175/// operation.
176///
177/// For example, in the following scenario:
178///
179/// 1. A [`TcpStream`] is registered with `Poll`.
180/// 2. The socket receives 2kb of data.
181/// 3. A call to [`Poll::poll`] returns the token associated with the socket
182/// indicating readable readiness.
183/// 4. 2kb is read from the socket.
184/// 5. Another call to read is issued and [`WouldBlock`] is returned
185/// 6. The socket receives another 2kb of data.
186/// 7. Another call to [`Poll::poll`] is made.
187///
188/// Assuming the socket was registered with `Poll` with the [`edge`] and
189/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
190/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
191/// returning an event.
192///
193/// In order to receive the event for the data received in step 6, the socket
194/// would need to be reregistered using [`reregister`].
195///
196/// [`PollOpt`]: struct.PollOpt.html
197/// [`edge`]: struct.PollOpt.html#method.edge
198/// [`level`]: struct.PollOpt.html#method.level
199/// [`Poll::poll`]: struct.Poll.html#method.poll
200/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
201/// [`Evented`]: event/trait.Evented.html
202/// [`TcpStream`]: tcp/struct.TcpStream.html
203/// [`reregister`]: #method.reregister
204/// [`oneshot`]: struct.PollOpt.html#method.oneshot
205///
206/// # Portability
207///
208/// Using `Poll` provides a portable interface across supported platforms as
209/// long as the caller takes the following into consideration:
210///
211/// ### Spurious events
212///
213/// [`Poll::poll`] may return readiness events even if the associated
214/// [`Evented`] handle is not actually ready. Given the same code, this may
215/// happen more on some platforms than others. It is important to never assume
216/// that, just because a readiness notification was received, that the
217/// associated operation will succeed as well.
218///
219/// If operation fails with [`WouldBlock`], then the caller should not treat
220/// this as an error, but instead should wait until another readiness event is
221/// received.
222///
223/// ### Draining readiness
224///
225/// When using edge-triggered mode, once a readiness event is received, the
226/// corresponding operation must be performed repeatedly until it returns
227/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
228/// readiness event will be delivered, even if further data is received for the
229/// [`Evented`] handle.
230///
231/// For example, in the first scenario described above, after step 5, even if
232/// the socket receives more data there is no guarantee that another readiness
233/// event will be delivered.
234///
235/// ### Readiness operations
236///
237/// The only readiness operations that are guaranteed to be present on all
238/// supported platforms are [`readable`] and [`writable`]. All other readiness
239/// operations may have false negatives and as such should be considered
240/// **hints**. This means that if a socket is registered with [`readable`],
241/// [`error`], and [`hup`] interest, and either an error or hup is received, a
242/// readiness event will be generated for the socket, but it **may** only
243/// include `readable` readiness. Also note that, given the potential for
244/// spurious events, receiving a readiness event with `hup` or `error` doesn't
245/// actually mean that a `read` on the socket will return a result matching the
246/// readiness event.
247///
248/// In other words, portable programs that explicitly check for [`hup`] or
249/// [`error`] readiness should be doing so as an **optimization** and always be
250/// able to handle an error or HUP situation when performing the actual read
251/// operation.
252///
253/// [`readable`]: struct.Ready.html#method.readable
254/// [`writable`]: struct.Ready.html#method.writable
255/// [`error`]: unix/struct.UnixReady.html#method.error
256/// [`hup`]: unix/struct.UnixReady.html#method.hup
257///
258/// ### Registering handles
259///
260/// Unless otherwise noted, it should be assumed that types implementing
261/// [`Evented`] will never become ready unless they are registered with `Poll`.
262///
263/// For example:
264///
265/// ```
266/// # use std::error::Error;
267/// # fn try_main() -> Result<(), Box<Error>> {
268/// use futures_net::driver::sys::{Poll, Token};
269/// use futures_net::driver::sys::event::{Ready, PollOpt};
270/// use futures_net::driver::sys::net::TcpStream;
271/// use std::time::Duration;
272/// use std::thread;
273///
274/// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275///
276/// thread::sleep(Duration::from_secs(1));
277///
278/// let poll = Poll::new()?;
279///
280/// // The connect is not guaranteed to have started until it is registered at
281/// // this point
282/// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283/// # Ok(())
284/// # }
285/// #
286/// # fn main() {
287/// # try_main().unwrap();
288/// # }
289/// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// | OS | Selector |
296/// |------------|-----------|
297/// | Linux | [epoll] |
298/// | OS X, iOS | [kqueue] |
299/// | Windows | [IOCP] |
300/// | FreeBSD | [kqueue] |
301/// | Android | [epoll] |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model Mio's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
323/// [`EventedFd`]: unix/struct.EventedFd.html
324/// [`SetReadiness`]: struct.SetReadiness.html
325/// [`Poll::poll`]: struct.Poll.html#method.poll
326pub struct Poll {
327 // Platform specific IO selector
328 selector: linux::Selector,
329
330 // Custom readiness queue
331 readiness_queue: ReadinessQueue,
332
333 // Use an atomic to first check if a full lock will be required. This is a
334 // fast-path check for single threaded cases avoiding the extra syscall
335 lock_state: AtomicUsize,
336
337 // Sequences concurrent calls to `Poll::poll`
338 lock: Mutex<()>,
339
340 // Wakeup the next waiter
341 condvar: Condvar,
342}
343
344/// Handle to a user space `Poll` registration.
345///
346/// `Registration` allows implementing [`Evented`] for types that cannot work
347/// with the [system selector]. A `Registration` is always paired with a
348/// `SetReadiness`, which allows updating the registration's readiness state.
349/// When [`set_readiness`] is called and the `Registration` is associated with a
350/// [`Poll`] instance, a readiness event will be created and eventually returned
351/// by [`poll`].
352///
353/// A `Registration` / `SetReadiness` pair is created by calling
354/// [`Registration::new2`]. At this point, the registration is not being
355/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
356/// result in any readiness notifications.
357///
358/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
359/// the same [`register`], [`reregister`], and [`deregister`] functions used
360/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
361/// changes result in readiness events being dispatched to the [`Poll`] instance
362/// with which `Registration` is registered.
363///
364/// **Note**, before using `Registration` be sure to read the
365/// [`set_readiness`] documentation and the [portability] notes. The
366/// guarantees offered by `Registration` may be weaker than expected.
367///
368/// For high level documentation, see [`Poll`].
369///
370/// # Examples
371///
372/// ```
373/// use futures_net::driver::sys::{Registration, Poll, Token};
374/// use futures_net::driver::sys::event::{Evented, Ready, PollOpt};
375///
376/// use std::io;
377/// use std::time::Instant;
378/// use std::thread;
379///
380/// pub struct Deadline {
381/// when: Instant,
382/// registration: Registration,
383/// }
384///
385/// impl Deadline {
386/// pub fn new(when: Instant) -> Deadline {
387/// let (registration, set_readiness) = Registration::new2();
388///
389/// thread::spawn(move || {
390/// let now = Instant::now();
391///
392/// if now < when {
393/// thread::sleep(when - now);
394/// }
395///
396/// set_readiness.set_readiness(Ready::readable());
397/// });
398///
399/// Deadline {
400/// when: when,
401/// registration: registration,
402/// }
403/// }
404///
405/// pub fn is_elapsed(&self) -> bool {
406/// Instant::now() >= self.when
407/// }
408/// }
409///
410/// impl Evented for Deadline {
411/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
412/// -> io::Result<()>
413/// {
414/// self.registration.register(poll, token, interest, opts)
415/// }
416///
417/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
418/// -> io::Result<()>
419/// {
420/// self.registration.reregister(poll, token, interest, opts)
421/// }
422///
423/// fn deregister(&self, poll: &Poll) -> io::Result<()> {
424/// poll.deregister(&self.registration)
425/// }
426/// }
427/// ```
428///
429/// [system selector]: struct.Poll.html#implementation-notes
430/// [`Poll`]: struct.Poll.html
431/// [`Registration::new2`]: struct.Registration.html#method.new2
432/// [`Evented`]: event/trait.Evented.html
433/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
434/// [`register`]: struct.Poll.html#method.register
435/// [`reregister`]: struct.Poll.html#method.reregister
436/// [`deregister`]: struct.Poll.html#method.deregister
437/// [portability]: struct.Poll.html#portability
438pub struct Registration {
439 inner: RegistrationInner,
440}
441
442unsafe impl Send for Registration {}
443unsafe impl Sync for Registration {}
444
445/// Updates the readiness state of the associated `Registration`.
446///
447/// See [`Registration`] for more documentation on using `SetReadiness` and
448/// [`Poll`] for high level polling documentation.
449///
450/// [`Poll`]: struct.Poll.html
451/// [`Registration`]: struct.Registration.html
452#[derive(Clone)]
453pub struct SetReadiness {
454 inner: RegistrationInner,
455}
456
457unsafe impl Send for SetReadiness {}
458unsafe impl Sync for SetReadiness {}
459
460/// Used to associate an IO type with a Selector
461#[derive(Debug)]
462pub struct SelectorId {
463 id: AtomicUsize,
464}
465
466struct RegistrationInner {
467 // Unsafe pointer to the registration's node. The node is ref counted. This
468 // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
469 // handle though it isn't stored anywhere. In other words, `Poll::poll`
470 // needs to decrement the ref count before the node is freed.
471 node: *mut ReadinessNode,
472}
473
474#[derive(Clone)]
475struct ReadinessQueue {
476 inner: Arc<ReadinessQueueInner>,
477}
478
479unsafe impl Send for ReadinessQueue {}
480unsafe impl Sync for ReadinessQueue {}
481
482struct ReadinessQueueInner {
483 // Used to wake up `Poll` when readiness is set in another thread.
484 awakener: linux::Awakener,
485
486 // Head of the MPSC queue used to signal readiness to `Poll::poll`.
487 head_readiness: AtomicPtr<ReadinessNode>,
488
489 // Tail of the readiness queue.
490 //
491 // Only accessed by Poll::poll. Coordination will be handled by the poll fn
492 tail_readiness: UnsafeCell<*mut ReadinessNode>,
493
494 // Fake readiness node used to punctuate the end of the readiness queue.
495 // Before attempting to read from the queue, this node is inserted in order
496 // to partition the queue between nodes that are "owned" by the dequeue end
497 // and nodes that will be pushed on by producers.
498 end_marker: Box<ReadinessNode>,
499
500 // Similar to `end_marker`, but this node signals to producers that `Poll`
501 // has gone to sleep and must be woken up.
502 sleep_marker: Box<ReadinessNode>,
503
504 // Similar to `end_marker`, but the node signals that the queue is closed.
505 // This happens when `ReadyQueue` is dropped and signals to producers that
506 // the nodes should no longer be pushed into the queue.
507 closed_marker: Box<ReadinessNode>,
508}
509
510/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
511/// queued into the MPSC channel.
512struct ReadinessNode {
513 // Node state, see struct docs for `ReadinessState`
514 //
515 // This variable is the primary point of coordination between all the
516 // various threads concurrently accessing the node.
517 state: AtomicState,
518
519 // The registration token cannot fit into the `state` variable, so it is
520 // broken out here. In order to atomically update both the state and token
521 // we have to jump through a few hoops.
522 //
523 // First, `state` includes `token_read_pos` and `token_write_pos`. These can
524 // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
525 // the token slot that contains the most up to date registration token.
526 // `token_read_pos` is the token slot that `poll` is currently reading from.
527 //
528 // When a call to `update` includes a different token than the one currently
529 // associated with the registration (token_write_pos), first an unused token
530 // slot is found. The unused slot is the one not represented by
531 // `token_read_pos` OR `token_write_pos`. The new token is written to this
532 // slot, then `state` is updated with the new `token_write_pos` value. This
533 // requires that there is only a *single* concurrent call to `update`.
534 //
535 // When `poll` reads a node state, it checks that `token_read_pos` matches
536 // `token_write_pos`. If they do not match, then it atomically updates
537 // `state` such that `token_read_pos` is set to `token_write_pos`. It will
538 // then read the token at the newly updated `token_read_pos`.
539 token_0: UnsafeCell<Token>,
540 token_1: UnsafeCell<Token>,
541 token_2: UnsafeCell<Token>,
542
543 // Used when the node is queued in the readiness linked list. Accessing
544 // this field requires winning the "queue" lock
545 next_readiness: AtomicPtr<ReadinessNode>,
546
547 // Ensures that there is only one concurrent call to `update`.
548 //
549 // Each call to `update` will attempt to swap `update_lock` from `false` to
550 // `true`. If the CAS succeeds, the thread has obtained the update lock. If
551 // the CAS fails, then the `update` call returns immediately and the update
552 // is discarded.
553 update_lock: AtomicBool,
554
555 // Pointer to Arc<ReadinessQueueInner>
556 readiness_queue: AtomicPtr<()>,
557
558 // Tracks the number of `ReadyRef` pointers
559 ref_count: AtomicUsize,
560}
561
562/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
563/// atomic variable handles encoding / decoding `ReadinessState` values.
564struct AtomicState {
565 inner: AtomicUsize,
566}
567
568const MASK_2: usize = 4 - 1;
569const MASK_4: usize = 16 - 1;
570const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
571const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
572
573const READINESS_SHIFT: usize = 0;
574const INTEREST_SHIFT: usize = 4;
575const POLL_OPT_SHIFT: usize = 8;
576const TOKEN_RD_SHIFT: usize = 12;
577const TOKEN_WR_SHIFT: usize = 14;
578const QUEUED_SHIFT: usize = 16;
579const DROPPED_SHIFT: usize = 17;
580
581/// Tracks all state for a single `ReadinessNode`. The state is packed into a
582/// `usize` variable from low to high bit as follows:
583///
584/// 4 bits: Registration current readiness
585/// 4 bits: Registration interest
586/// 4 bits: Poll options
587/// 2 bits: Token position currently being read from by `poll`
588/// 2 bits: Token position last written to by `update`
589/// 1 bit: Queued flag, set when node is being pushed into MPSC queue.
590/// 1 bit: Dropped flag, set when all `Registration` handles have been dropped.
591#[derive(Debug, Copy, Clone, Eq, PartialEq)]
592struct ReadinessState(usize);
593
594/// Returned by `dequeue_node`. Represents the different states as described by
595/// the queue documentation on 1024cores.net.
596enum Dequeue {
597 Data(*mut ReadinessNode),
598 Empty,
599 Inconsistent,
600}
601
602const AWAKEN: Token = Token(usize::MAX);
603const MAX_REFCOUNT: usize = (isize::MAX) as usize;
604
605/*
606 *
607 * ===== Poll =====
608 *
609 */
610
611impl Poll {
612 /// Return a new `Poll` handle.
613 ///
614 /// This function will make a syscall to the operating system to create the
615 /// system selector. If this syscall fails, `Poll::new` will return with the
616 /// error.
617 ///
618 /// See [struct] level docs for more details.
619 ///
620 /// [struct]: struct.Poll.html
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// # use std::error::Error;
626 /// # fn try_main() -> Result<(), Box<Error>> {
627 /// use futures_net::driver::sys::event::Events ;
628 /// use futures_net::driver::sys::Poll;
629 /// use std::time::Duration;
630 ///
631 /// let poll = match Poll::new() {
632 /// Ok(poll) => poll,
633 /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
634 /// };
635 ///
636 /// // Create a structure to receive polled events
637 /// let mut events = Events::with_capacity(1024);
638 ///
639 /// // Wait for events, but none will be received because no `Evented`
640 /// // handles have been registered with this `Poll` instance.
641 /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
642 /// assert_eq!(n, 0);
643 /// # Ok(())
644 /// # }
645 /// #
646 /// # fn main() {
647 /// # try_main().unwrap();
648 /// # }
649 /// ```
650 pub fn new() -> io::Result<Poll> {
651 is_send::<Poll>();
652 is_sync::<Poll>();
653
654 let poll = Poll {
655 selector: linux::Selector::new()?,
656 readiness_queue: ReadinessQueue::new()?,
657 lock_state: AtomicUsize::new(0),
658 lock: Mutex::new(()),
659 condvar: Condvar::new(),
660 };
661
662 // Register the notification wakeup FD with the IO poller
663 poll.readiness_queue.inner.awakener.register(
664 &poll,
665 AWAKEN,
666 Ready::readable(),
667 PollOpt::edge(),
668 )?;
669
670 Ok(poll)
671 }
672
673 /// Register an `Evented` handle with the `Poll` instance.
674 ///
675 /// Once registered, the `Poll` instance will monitor the `Evented` handle
676 /// for readiness state changes. When it notices a state change, it will
677 /// return a readiness event for the handle the next time [`poll`] is
678 /// called.
679 ///
680 /// See the [`struct`] docs for a high level overview.
681 ///
682 /// # Arguments
683 ///
684 /// `handle: &E: Evented`: This is the handle that the `Poll` instance
685 /// should monitor for readiness state changes.
686 ///
687 /// `token: Token`: The caller picks a token to associate with the socket.
688 /// When [`poll`] returns an event for the handle, this token is included.
689 /// This allows the caller to map the event to its handle. The token
690 /// associated with the `Evented` handle can be changed at any time by
691 /// calling [`reregister`].
692 ///
693 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
694 /// usage.
695 ///
696 /// See documentation on [`Token`] for an example showing how to pick
697 /// [`Token`] values.
698 ///
699 /// `interest: Ready`: Specifies which operations `Poll` should monitor for
700 /// readiness. `Poll` will only return readiness events for operations
701 /// specified by this argument.
702 ///
703 /// If a socket is registered with readable interest and the socket becomes
704 /// writable, no event will be returned from [`poll`].
705 ///
706 /// The readiness interest for an `Evented` handle can be changed at any
707 /// time by calling [`reregister`].
708 ///
709 /// `opts: PollOpt`: Specifies the registration options. The most common
710 /// options being [`level`] for level-triggered events, [`edge`] for
711 /// edge-triggered events, and [`oneshot`].
712 ///
713 /// The registration options for an `Evented` handle can be changed at any
714 /// time by calling [`reregister`].
715 ///
716 /// # Notes
717 ///
718 /// Unless otherwise specified, the caller should assume that once an
719 /// `Evented` handle is registered with a `Poll` instance, it is bound to
720 /// that `Poll` instance for the lifetime of the `Evented` handle. This
721 /// remains true even if the `Evented` handle is deregistered from the poll
722 /// instance using [`deregister`].
723 ///
724 /// This function is **thread safe**. It can be called concurrently from
725 /// multiple threads.
726 ///
727 /// [`struct`]: #
728 /// [`reregister`]: #method.reregister
729 /// [`deregister`]: #method.deregister
730 /// [`poll`]: #method.poll
731 /// [`level`]: struct.PollOpt.html#method.level
732 /// [`edge`]: struct.PollOpt.html#method.edge
733 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
734 /// [`Token`]: struct.Token.html
735 ///
736 /// # Examples
737 ///
738 /// ```
739 /// # use std::error::Error;
740 /// # fn try_main() -> Result<(), Box<Error>> {
741 /// use futures_net::driver::sys::{Poll, Token};
742 /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
743 /// use futures_net::driver::sys::net::TcpStream;
744 /// use std::time::{Duration, Instant};
745 ///
746 /// let poll = Poll::new()?;
747 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748 ///
749 /// // Register the socket with `poll`
750 /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751 ///
752 /// let mut events = Events::with_capacity(1024);
753 /// let start = Instant::now();
754 /// let timeout = Duration::from_millis(500);
755 ///
756 /// loop {
757 /// let elapsed = start.elapsed();
758 ///
759 /// if elapsed >= timeout {
760 /// // Connection timed out
761 /// return Ok(());
762 /// }
763 ///
764 /// let remaining = timeout - elapsed;
765 /// poll.poll(&mut events, Some(remaining))?;
766 ///
767 /// for event in &events {
768 /// if event.token() == Token(0) {
769 /// // Something (probably) happened on the socket.
770 /// return Ok(());
771 /// }
772 /// }
773 /// }
774 /// # Ok(())
775 /// # }
776 /// #
777 /// # fn main() {
778 /// # try_main().unwrap();
779 /// # }
780 /// ```
781 pub fn register<E: ?Sized>(
782 &self,
783 handle: &E,
784 token: Token,
785 interest: Ready,
786 opts: PollOpt,
787 ) -> io::Result<()>
788 where
789 E: Evented,
790 {
791 validate_args(token)?;
792
793 /*
794 * Undefined behavior:
795 * - Reusing a token with a different `Evented` without deregistering
796 * (or closing) the original `Evented`.
797 */
798 log::trace!("registering with poller");
799
800 // Register interests for this socket
801 handle.register(self, token, interest, opts)?;
802
803 Ok(())
804 }
805
806 /// Re-register an `Evented` handle with the `Poll` instance.
807 ///
808 /// Re-registering an `Evented` handle allows changing the details of the
809 /// registration. Specifically, it allows updating the associated `token`,
810 /// `interest`, and `opts` specified in previous `register` and `reregister`
811 /// calls.
812 ///
813 /// The `reregister` arguments fully override the previous values. In other
814 /// words, if a socket is registered with [`readable`] interest and the call
815 /// to `reregister` specifies [`writable`], then read interest is no longer
816 /// requested for the handle.
817 ///
818 /// The `Evented` handle must have previously been registered with this
819 /// instance of `Poll` otherwise the call to `reregister` will return with
820 /// an error.
821 ///
822 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823 /// usage.
824 ///
825 /// See the [`register`] documentation for details about the function
826 /// arguments and see the [`struct`] docs for a high level overview of
827 /// polling.
828 ///
829 /// # Examples
830 ///
831 /// ```
832 /// # use std::error::Error;
833 /// # fn try_main() -> Result<(), Box<Error>> {
834 /// use futures_net::driver::sys::{Poll, Token};
835 /// use futures_net::driver::sys::event::{Ready, PollOpt};
836 /// use futures_net::driver::sys::net::TcpStream;
837 ///
838 /// let poll = Poll::new()?;
839 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
840 ///
841 /// // Register the socket with `poll`, requesting readable
842 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
843 ///
844 /// // Reregister the socket specifying a different token and write interest
845 /// // instead. `PollOpt::edge()` must be specified even though that value
846 /// // is not being changed.
847 /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
848 /// # Ok(())
849 /// # }
850 /// #
851 /// # fn main() {
852 /// # try_main().unwrap();
853 /// # }
854 /// ```
855 ///
856 /// [`struct`]: #
857 /// [`register`]: #method.register
858 /// [`readable`]: struct.Ready.html#method.readable
859 /// [`writable`]: struct.Ready.html#method.writable
860 pub fn reregister<E: ?Sized>(
861 &self,
862 handle: &E,
863 token: Token,
864 interest: Ready,
865 opts: PollOpt,
866 ) -> io::Result<()>
867 where
868 E: Evented,
869 {
870 validate_args(token)?;
871
872 log::trace!("registering with poller");
873
874 // Register interests for this socket
875 handle.reregister(self, token, interest, opts)?;
876
877 Ok(())
878 }
879
880 /// Deregister an `Evented` handle with the `Poll` instance.
881 ///
882 /// When an `Evented` handle is deregistered, the `Poll` instance will
883 /// no longer monitor it for readiness state changes. Unlike disabling
884 /// handles with oneshot, deregistering clears up any internal resources
885 /// needed to track the handle.
886 ///
887 /// A handle can be passed back to `register` after it has been
888 /// deregistered; however, it must be passed back to the **same** `Poll`
889 /// instance.
890 ///
891 /// `Evented` handles are automatically deregistered when they are dropped.
892 /// It is common to never need to explicitly call `deregister`.
893 ///
894 /// # Examples
895 ///
896 /// ```
897 /// # use std::error::Error;
898 /// # fn try_main() -> Result<(), Box<Error>> {
899 /// use futures_net::driver::sys::{Poll, Token};
900 /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
901 /// use futures_net::driver::sys::net::TcpStream;
902 /// use std::time::Duration;
903 ///
904 /// let poll = Poll::new()?;
905 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
906 ///
907 /// // Register the socket with `poll`
908 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
909 ///
910 /// poll.deregister(&socket)?;
911 ///
912 /// let mut events = Events::with_capacity(1024);
913 ///
914 /// // Set a timeout because this poll should never receive any events.
915 /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
916 /// assert_eq!(0, n);
917 /// # Ok(())
918 /// # }
919 /// #
920 /// # fn main() {
921 /// # try_main().unwrap();
922 /// # }
923 /// ```
924 pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
925 where
926 E: Evented,
927 {
928 log::trace!("deregistering handle with poller");
929
930 // Deregister interests for this socket
931 handle.deregister(self)?;
932
933 Ok(())
934 }
935
936 /// Wait for readiness events
937 ///
938 /// Blocks the current thread and waits for readiness events for any of the
939 /// `Evented` handles that have been registered with this `Poll` instance.
940 /// The function will block until either at least one readiness event has
941 /// been received or `timeout` has elapsed. A `timeout` of `None` means that
942 /// `poll` will block until a readiness event has been received.
943 ///
944 /// The supplied `events` will be cleared and newly received readiness events
945 /// will be pushed onto the end. At most `events.capacity()` events will be
946 /// returned. If there are further pending readiness events, they will be
947 /// returned on the next call to `poll`.
948 ///
949 /// A single call to `poll` may result in multiple readiness events being
950 /// returned for a single `Evented` handle. For example, if a TCP socket
951 /// becomes both readable and writable, it may be possible for a single
952 /// readiness event to be returned with both [`readable`] and [`writable`]
953 /// readiness **OR** two separate events may be returned, one with
954 /// [`readable`] set and one with [`writable`] set.
955 ///
956 /// Note that the `timeout` will be rounded up to the system clock
957 /// granularity (usually 1ms), and kernel scheduling delays mean that
958 /// the blocking interval may be overrun by a small amount.
959 ///
960 /// `poll` returns the number of readiness events that have been pushed into
961 /// `events` or `Err` when an error has been encountered with the system
962 /// selector. The value returned is deprecated and will be removed in 0.7.0.
963 /// Accessing the events by index is also deprecated. Events can be
964 /// inserted by other events triggering, thus making sequential access
965 /// problematic. Use the iterator API instead. See [`iter`].
966 ///
967 /// See the [struct] level documentation for a higher level discussion of
968 /// polling.
969 ///
970 /// [`readable`]: struct.Ready.html#method.readable
971 /// [`writable`]: struct.Ready.html#method.writable
972 /// [struct]: #
973 /// [`iter`]: struct.Events.html#method.iter
974 ///
975 /// # Examples
976 ///
977 /// A basic example -- establishing a `TcpStream` connection.
978 ///
979 /// ```
980 /// # use std::error::Error;
981 /// # fn try_main() -> Result<(), Box<Error>> {
982 /// use futures_net::driver::sys::{Poll, Token};
983 /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
984 /// use futures_net::driver::sys::net::TcpStream;
985 ///
986 /// use std::net::{TcpListener, SocketAddr};
987 /// use std::thread;
988 ///
989 /// // Bind a server socket to connect to.
990 /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
991 /// let server = TcpListener::bind(&addr)?;
992 /// let addr = server.local_addr()?.clone();
993 ///
994 /// // Spawn a thread to accept the socket
995 /// thread::spawn(move || {
996 /// let _ = server.accept();
997 /// });
998 ///
999 /// // Construct a new `Poll` handle as well as the `Events` we'll store into
1000 /// let poll = Poll::new()?;
1001 /// let mut events = Events::with_capacity(1024);
1002 ///
1003 /// // Connect the stream
1004 /// let stream = TcpStream::connect(&addr)?;
1005 ///
1006 /// // Register the stream with `Poll`
1007 /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1008 ///
1009 /// // Wait for the socket to become ready. This has to happens in a loop to
1010 /// // handle spurious wakeups.
1011 /// loop {
1012 /// poll.poll(&mut events, None)?;
1013 ///
1014 /// for event in &events {
1015 /// if event.token() == Token(0) && event.readiness().is_writable() {
1016 /// // The socket connected (probably, it could still be a spurious
1017 /// // wakeup)
1018 /// return Ok(());
1019 /// }
1020 /// }
1021 /// }
1022 /// # Ok(())
1023 /// # }
1024 /// #
1025 /// # fn main() {
1026 /// # try_main().unwrap();
1027 /// # }
1028 /// ```
1029 ///
1030 /// [struct]: #
1031 pub fn poll(
1032 &self,
1033 events: &mut Events,
1034 timeout: Option<Duration>,
1035 ) -> io::Result<usize> {
1036 self.poll1(events, timeout, false)
1037 }
1038
1039 /// Like `poll`, but may be interrupted by a signal
1040 ///
1041 /// If `poll` is inturrupted while blocking, it will transparently retry the syscall. If you
1042 /// want to handle signals yourself, however, use `poll_interruptible`.
1043 pub fn poll_interruptible(
1044 &self,
1045 events: &mut Events,
1046 timeout: Option<Duration>,
1047 ) -> io::Result<usize> {
1048 self.poll1(events, timeout, true)
1049 }
1050
1051 fn poll1(
1052 &self,
1053 events: &mut Events,
1054 mut timeout: Option<Duration>,
1055 interruptible: bool,
1056 ) -> io::Result<usize> {
1057 let zero = Some(Duration::from_millis(0));
1058
1059 // At a high level, the synchronization strategy is to acquire access to
1060 // the critical section by transitioning the atomic from unlocked ->
1061 // locked. If the attempt fails, the thread will wait on the condition
1062 // variable.
1063 //
1064 // # Some more detail
1065 //
1066 // The `lock_state` atomic usize combines:
1067 //
1068 // - locked flag, stored in the least significant bit
1069 // - number of waiting threads, stored in the rest of the bits.
1070 //
1071 // When a thread transitions the locked flag from 0 -> 1, it has
1072 // obtained access to the critical section.
1073 //
1074 // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1075 // This is a fast path for the case when there are no concurrent calls
1076 // to poll, which is very common.
1077 //
1078 // On failure, the mutex is locked, and the thread attempts to increment
1079 // the number of waiting threads component of `lock_state`. If this is
1080 // successfully done while the locked flag is set, then the thread can
1081 // wait on the condition variable.
1082 //
1083 // When a thread exits the critical section, it unsets the locked flag.
1084 // If there are any waiters, which is atomically determined while
1085 // unsetting the locked flag, then the condvar is notified.
1086
1087 let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1088
1089 if 0 != curr {
1090 // Enter slower path
1091 let mut lock = self.lock.lock().unwrap();
1092 let mut inc = false;
1093
1094 loop {
1095 if curr & 1 == 0 {
1096 // The lock is currently free, attempt to grab it
1097 let mut next = curr | 1;
1098
1099 if inc {
1100 // The waiter count has previously been incremented, so
1101 // decrement it here
1102 next -= 2;
1103 }
1104
1105 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1106
1107 if actual != curr {
1108 curr = actual;
1109 continue;
1110 }
1111
1112 // Lock acquired, break from the loop
1113 break;
1114 }
1115
1116 if timeout == zero {
1117 if inc {
1118 self.lock_state.fetch_sub(2, SeqCst);
1119 }
1120
1121 return Ok(0);
1122 }
1123
1124 // The lock is currently held, so wait for it to become
1125 // free. If the waiter count hasn't been incremented yet, do
1126 // so now
1127 if !inc {
1128 let next = curr.checked_add(2).expect("overflow");
1129 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1130
1131 if actual != curr {
1132 curr = actual;
1133 continue;
1134 }
1135
1136 // Track that the waiter count has been incremented for
1137 // this thread and fall through to the condvar waiting
1138 inc = true;
1139 }
1140
1141 lock = match timeout {
1142 Some(to) => {
1143 let now = Instant::now();
1144
1145 // Wait to be notified
1146 let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1147
1148 // See how much time was elapsed in the wait
1149 let elapsed = now.elapsed();
1150
1151 // Update `timeout` to reflect how much time is left to
1152 // wait.
1153 if elapsed >= to {
1154 timeout = zero;
1155 } else {
1156 // Update the timeout
1157 timeout = Some(to - elapsed);
1158 }
1159
1160 l
1161 }
1162 None => self.condvar.wait(lock).unwrap(),
1163 };
1164
1165 // Reload the state
1166 curr = self.lock_state.load(SeqCst);
1167
1168 // Try to lock again...
1169 }
1170 }
1171
1172 let ret = self.poll2(events, timeout, interruptible);
1173
1174 // Release the lock
1175 if 1 != self.lock_state.fetch_and(!1, Release) {
1176 // Acquire the mutex
1177 let _lock = self.lock.lock().unwrap();
1178
1179 // There is at least one waiting thread, so notify one
1180 self.condvar.notify_one();
1181 }
1182
1183 ret
1184 }
1185
1186 #[inline]
1187 #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
1188 fn poll2(
1189 &self,
1190 events: &mut Events,
1191 mut timeout: Option<Duration>,
1192 interruptible: bool,
1193 ) -> io::Result<usize> {
1194 // Compute the timeout value passed to the system selector. If the
1195 // readiness queue has pending nodes, we still want to poll the system
1196 // selector for new events, but we don't want to block the thread to
1197 // wait for new events.
1198 if timeout == Some(Duration::from_millis(0)) {
1199 // If blocking is not requested, then there is no need to prepare
1200 // the queue for sleep
1201 //
1202 // The sleep_marker should be removed by readiness_queue.poll().
1203 } else if self.readiness_queue.prepare_for_sleep() {
1204 // The readiness queue is empty. The call to `prepare_for_sleep`
1205 // inserts `sleep_marker` into the queue. This signals to any
1206 // threads setting readiness that the `Poll::poll` is going to
1207 // sleep, so the awakener should be used.
1208 } else {
1209 // The readiness queue is not empty, so do not block the thread.
1210 timeout = Some(Duration::from_millis(0));
1211 }
1212
1213 loop {
1214 let now = Instant::now();
1215 // First get selector events
1216 let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1217 match res {
1218 Ok(true) => {
1219 // Some awakeners require reading from a FD.
1220 self.readiness_queue.inner.awakener.cleanup();
1221 break;
1222 }
1223 Ok(false) => break,
1224 Err(ref e)
1225 if e.kind() == io::ErrorKind::Interrupted && !interruptible =>
1226 {
1227 // Interrupted by a signal; update timeout if necessary and retry
1228 if let Some(to) = timeout {
1229 let elapsed = now.elapsed();
1230 if elapsed >= to {
1231 break;
1232 } else {
1233 timeout = Some(to - elapsed);
1234 }
1235 }
1236 }
1237 Err(e) => return Err(e),
1238 }
1239 }
1240
1241 // Poll custom event queue
1242 self.readiness_queue.poll(&mut events.inner);
1243
1244 // Return number of polled events
1245 Ok(events.inner.len())
1246 }
1247}
1248
1249fn validate_args(token: Token) -> io::Result<()> {
1250 if token == AWAKEN {
1251 return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1252 }
1253
1254 Ok(())
1255}
1256
1257impl fmt::Debug for Poll {
1258 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1259 fmt.debug_struct("Poll").finish()
1260 }
1261}
1262
1263impl AsRawFd for Poll {
1264 fn as_raw_fd(&self) -> RawFd {
1265 self.selector.as_raw_fd()
1266 }
1267}
1268
1269/// A collection of readiness events.
1270///
1271/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1272/// receive any new readiness events received since the last poll. Usually, a
1273/// single `Events` instance is created at the same time as a [`Poll`] and
1274/// reused on each call to [`Poll::poll`].
1275///
1276/// See [`Poll`] for more documentation on polling.
1277///
1278/// # Examples
1279///
1280/// ```
1281/// # use std::error::Error;
1282/// # fn try_main() -> Result<(), Box<Error>> {
1283/// use futures_net::driver::sys::Poll;
1284/// use futures_net::driver::sys::event::Events;
1285/// use std::time::Duration;
1286///
1287/// let mut events = Events::with_capacity(1024);
1288/// let poll = Poll::new()?;
1289///
1290/// assert_eq!(0, events.len());
1291///
1292/// // Register `Evented` handles with `poll`
1293///
1294/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1295///
1296/// for event in &events {
1297/// println!("event={:?}", event);
1298/// }
1299/// # Ok(())
1300/// # }
1301/// #
1302/// # fn main() {
1303/// # try_main().unwrap();
1304/// # }
1305/// ```
1306///
1307/// [`Poll::poll`]: struct.Poll.html#method.poll
1308/// [`Poll`]: struct.Poll.html
1309pub struct Events {
1310 inner: linux::Events,
1311}
1312
1313/// [`Events`] iterator.
1314///
1315/// This struct is created by the [`iter`] method on [`Events`].
1316///
1317/// # Examples
1318///
1319/// ```
1320/// # use std::error::Error;
1321/// # fn try_main() -> Result<(), Box<Error>> {
1322/// use futures_net::driver::sys::Poll;
1323/// use futures_net::driver::sys::event::Events;
1324/// use std::time::Duration;
1325///
1326/// let mut events = Events::with_capacity(1024);
1327/// let poll = Poll::new()?;
1328///
1329/// // Register handles with `poll`
1330///
1331/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1332///
1333/// for event in events.iter() {
1334/// println!("event={:?}", event);
1335/// }
1336/// # Ok(())
1337/// # }
1338/// #
1339/// # fn main() {
1340/// # try_main().unwrap();
1341/// # }
1342/// ```
1343///
1344/// [`Events`]: struct.Events.html
1345/// [`iter`]: struct.Events.html#method.iter
1346#[derive(Debug, Clone)]
1347pub struct Iter<'a> {
1348 inner: &'a Events,
1349 pos: usize,
1350}
1351
1352/// Owned [`Events`] iterator.
1353///
1354/// This struct is created by the `into_iter` method on [`Events`].
1355///
1356/// # Examples
1357///
1358/// ```
1359/// # use std::error::Error;
1360/// # fn try_main() -> Result<(), Box<Error>> {
1361/// use futures_net::driver::sys::Poll;
1362/// use futures_net::driver::sys::event::Events;
1363/// use std::time::Duration;
1364///
1365/// let mut events = Events::with_capacity(1024);
1366/// let poll = Poll::new()?;
1367///
1368/// // Register handles with `poll`
1369///
1370/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1371///
1372/// for event in events {
1373/// println!("event={:?}", event);
1374/// }
1375/// # Ok(())
1376/// # }
1377/// #
1378/// # fn main() {
1379/// # try_main().unwrap();
1380/// # }
1381/// ```
1382/// [`Events`]: struct.Events.html
1383#[derive(Debug)]
1384pub struct IntoIter {
1385 inner: Events,
1386 pos: usize,
1387}
1388
1389impl Events {
1390 /// Return a new `Events` capable of holding up to `capacity` events.
1391 ///
1392 /// # Examples
1393 ///
1394 /// ```
1395 /// use futures_net::driver::sys::event::Events;
1396 ///
1397 /// let events = Events::with_capacity(1024);
1398 ///
1399 /// assert_eq!(1024, events.capacity());
1400 /// ```
1401 pub fn with_capacity(capacity: usize) -> Events {
1402 Events {
1403 inner: linux::Events::with_capacity(capacity),
1404 }
1405 }
1406
1407 #[deprecated(
1408 since = "0.6.10",
1409 note = "Index access removed in favor of iterator only API."
1410 )]
1411 #[doc(hidden)]
1412 pub fn get(&self, idx: usize) -> Option<Event> {
1413 self.inner.get(idx)
1414 }
1415
1416 #[doc(hidden)]
1417 #[deprecated(
1418 since = "0.6.10",
1419 note = "Index access removed in favor of iterator only API."
1420 )]
1421 pub fn len(&self) -> usize {
1422 self.inner.len()
1423 }
1424
1425 /// Returns the number of `Event` values that `self` can hold.
1426 ///
1427 /// ```
1428 /// use futures_net::driver::sys::event::Events;
1429 ///
1430 /// let events = Events::with_capacity(1024);
1431 ///
1432 /// assert_eq!(1024, events.capacity());
1433 /// ```
1434 pub fn capacity(&self) -> usize {
1435 self.inner.capacity()
1436 }
1437
1438 /// Returns `true` if `self` contains no `Event` values.
1439 ///
1440 /// # Examples
1441 ///
1442 /// ```
1443 /// use futures_net::driver::sys::event::Events;
1444 ///
1445 /// let events = Events::with_capacity(1024);
1446 ///
1447 /// assert!(events.is_empty());
1448 /// ```
1449 pub fn is_empty(&self) -> bool {
1450 self.inner.is_empty()
1451 }
1452
1453 /// Returns an iterator over the `Event` values.
1454 ///
1455 /// # Examples
1456 ///
1457 /// ```
1458 /// # use std::error::Error;
1459 /// # fn try_main() -> Result<(), Box<Error>> {
1460 /// use futures_net::driver::sys::Poll;
1461 /// use futures_net::driver::sys::event::Events;
1462 /// use std::time::Duration;
1463 ///
1464 /// let mut events = Events::with_capacity(1024);
1465 /// let poll = Poll::new()?;
1466 ///
1467 /// // Register handles with `poll`
1468 ///
1469 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1470 ///
1471 /// for event in events.iter() {
1472 /// println!("event={:?}", event);
1473 /// }
1474 /// # Ok(())
1475 /// # }
1476 /// #
1477 /// # fn main() {
1478 /// # try_main().unwrap();
1479 /// # }
1480 /// ```
1481 pub fn iter(&self) -> Iter {
1482 Iter {
1483 inner: self,
1484 pos: 0,
1485 }
1486 }
1487
1488 /// Clearing all `Event` values from container explicitly.
1489 ///
1490 /// # Examples
1491 ///
1492 /// ```
1493 /// # use std::error::Error;
1494 /// # fn try_main() -> Result<(), Box<Error>> {
1495 /// use futures_net::driver::sys::Poll;
1496 /// use futures_net::driver::sys::event::Events;
1497 /// use std::time::Duration;
1498 ///
1499 /// let mut events = Events::with_capacity(1024);
1500 /// let poll = Poll::new()?;
1501 ///
1502 /// // Register handles with `poll`
1503 /// for _ in 0..2 {
1504 /// events.clear();
1505 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1506 ///
1507 /// for event in events.iter() {
1508 /// println!("event={:?}", event);
1509 /// }
1510 /// }
1511 /// # Ok(())
1512 /// # }
1513 /// #
1514 /// # fn main() {
1515 /// # try_main().unwrap();
1516 /// # }
1517 /// ```
1518 pub fn clear(&mut self) {
1519 self.inner.clear();
1520 }
1521}
1522
1523impl<'a> IntoIterator for &'a Events {
1524 type Item = Event;
1525 type IntoIter = Iter<'a>;
1526
1527 fn into_iter(self) -> Self::IntoIter {
1528 self.iter()
1529 }
1530}
1531
1532impl<'a> Iterator for Iter<'a> {
1533 type Item = Event;
1534
1535 fn next(&mut self) -> Option<Event> {
1536 let ret = self.inner.inner.get(self.pos);
1537 self.pos += 1;
1538 ret
1539 }
1540}
1541
1542impl IntoIterator for Events {
1543 type Item = Event;
1544 type IntoIter = IntoIter;
1545
1546 fn into_iter(self) -> Self::IntoIter {
1547 IntoIter {
1548 inner: self,
1549 pos: 0,
1550 }
1551 }
1552}
1553
1554impl Iterator for IntoIter {
1555 type Item = Event;
1556
1557 fn next(&mut self) -> Option<Event> {
1558 let ret = self.inner.inner.get(self.pos);
1559 self.pos += 1;
1560 ret
1561 }
1562}
1563
1564impl fmt::Debug for Events {
1565 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1566 f.debug_struct("Events")
1567 .field("capacity", &self.capacity())
1568 .finish()
1569 }
1570}
1571
1572// ===== Accessors for internal usage =====
1573
1574pub fn selector(poll: &Poll) -> &linux::Selector {
1575 &poll.selector
1576}
1577
1578/*
1579 *
1580 * ===== Registration =====
1581 *
1582 */
1583
1584// TODO: get rid of this, windows depends on it for now
1585#[allow(dead_code)]
1586pub fn new_registration(
1587 poll: &Poll,
1588 token: Token,
1589 ready: Ready,
1590 opt: PollOpt,
1591) -> (Registration, SetReadiness) {
1592 Registration::new_priv(poll, token, ready, opt)
1593}
1594
1595impl Registration {
1596 /// Create and return a new `Registration` and the associated
1597 /// `SetReadiness`.
1598 ///
1599 /// See [struct] documentation for more detail and [`Poll`]
1600 /// for high level documentation on polling.
1601 ///
1602 /// # Examples
1603 ///
1604 /// ```
1605 /// # use std::error::Error;
1606 /// # fn try_main() -> Result<(), Box<Error>> {
1607 /// use futures_net::driver::sys::{Poll, Registration, Token};
1608 /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
1609 /// use std::thread;
1610 ///
1611 /// let (registration, set_readiness) = Registration::new2();
1612 ///
1613 /// thread::spawn(move || {
1614 /// use std::time::Duration;
1615 /// thread::sleep(Duration::from_millis(500));
1616 ///
1617 /// set_readiness.set_readiness(Ready::readable());
1618 /// });
1619 ///
1620 /// let poll = Poll::new()?;
1621 /// poll.register(®istration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1622 ///
1623 /// let mut events = Events::with_capacity(256);
1624 ///
1625 /// loop {
1626 /// poll.poll(&mut events, None);
1627 ///
1628 /// for event in &events {
1629 /// if event.token() == Token(0) && event.readiness().is_readable() {
1630 /// return Ok(());
1631 /// }
1632 /// }
1633 /// }
1634 /// # Ok(())
1635 /// # }
1636 /// #
1637 /// # fn main() {
1638 /// # try_main().unwrap();
1639 /// # }
1640 /// ```
1641 /// [struct]: #
1642 /// [`Poll`]: struct.Poll.html
1643 pub fn new2() -> (Registration, SetReadiness) {
1644 // Allocate the registration node. The new node will have `ref_count`
1645 // set to 2: one SetReadiness, one Registration.
1646 let node = Box::into_raw(Box::new(ReadinessNode::new(
1647 ptr::null_mut(),
1648 Token(0),
1649 Ready::empty(),
1650 PollOpt::empty(),
1651 2,
1652 )));
1653
1654 let registration = Registration {
1655 inner: RegistrationInner { node },
1656 };
1657
1658 let set_readiness = SetReadiness {
1659 inner: RegistrationInner { node },
1660 };
1661
1662 (registration, set_readiness)
1663 }
1664
1665 // TODO: Get rid of this (windows depends on it for now)
1666 fn new_priv(
1667 poll: &Poll,
1668 token: Token,
1669 interest: Ready,
1670 opt: PollOpt,
1671 ) -> (Registration, SetReadiness) {
1672 is_send::<Registration>();
1673 is_sync::<Registration>();
1674 is_send::<SetReadiness>();
1675 is_sync::<SetReadiness>();
1676
1677 // Clone handle to the readiness queue, this bumps the ref count
1678 let queue = poll.readiness_queue.inner.clone();
1679
1680 // Convert to a *mut () pointer
1681 let queue: *mut () = unsafe { mem::transmute(queue) };
1682
1683 // Allocate the registration node. The new node will have `ref_count`
1684 // set to 3: one SetReadiness, one Registration, and one Poll handle.
1685 let node =
1686 Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1687
1688 let registration = Registration {
1689 inner: RegistrationInner { node },
1690 };
1691
1692 let set_readiness = SetReadiness {
1693 inner: RegistrationInner { node },
1694 };
1695
1696 (registration, set_readiness)
1697 }
1698
1699 #[doc(hidden)]
1700 pub fn update(
1701 &self,
1702 poll: &Poll,
1703 token: Token,
1704 interest: Ready,
1705 opts: PollOpt,
1706 ) -> io::Result<()> {
1707 self.inner.update(poll, token, interest, opts)
1708 }
1709}
1710
1711impl Evented for Registration {
1712 fn register(
1713 &self,
1714 poll: &Poll,
1715 token: Token,
1716 interest: Ready,
1717 opts: PollOpt,
1718 ) -> io::Result<()> {
1719 self.inner.update(poll, token, interest, opts)
1720 }
1721
1722 fn reregister(
1723 &self,
1724 poll: &Poll,
1725 token: Token,
1726 interest: Ready,
1727 opts: PollOpt,
1728 ) -> io::Result<()> {
1729 self.inner.update(poll, token, interest, opts)
1730 }
1731
1732 fn deregister(&self, poll: &Poll) -> io::Result<()> {
1733 self.inner
1734 .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1735 }
1736}
1737
1738impl Drop for Registration {
1739 fn drop(&mut self) {
1740 // `flag_as_dropped` toggles the `dropped` flag and notifies
1741 // `Poll::poll` to release its handle (which is just decrementing
1742 // the ref count).
1743 if self.inner.state.flag_as_dropped() {
1744 // Can't do anything if the queuing fails
1745 let _ = self.inner.enqueue_with_wakeup();
1746 }
1747 }
1748}
1749
1750impl fmt::Debug for Registration {
1751 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1752 fmt.debug_struct("Registration").finish()
1753 }
1754}
1755
1756impl SetReadiness {
1757 /// Returns the registration's current readiness.
1758 ///
1759 /// # Note
1760 ///
1761 /// There is no guarantee that `readiness` establishes any sort of memory
1762 /// ordering. Any concurrent data access must be synchronized using another
1763 /// strategy.
1764 ///
1765 /// # Examples
1766 ///
1767 /// ```
1768 /// # use std::error::Error;
1769 /// # fn try_main() -> Result<(), Box<Error>> {
1770 /// use futures_net::driver::sys::Registration;
1771 /// use futures_net::driver::sys::event::Ready;
1772 ///
1773 /// let (registration, set_readiness) = Registration::new2();
1774 ///
1775 /// assert!(set_readiness.readiness().is_empty());
1776 ///
1777 /// set_readiness.set_readiness(Ready::readable())?;
1778 /// assert!(set_readiness.readiness().is_readable());
1779 /// # Ok(())
1780 /// # }
1781 /// #
1782 /// # fn main() {
1783 /// # try_main().unwrap();
1784 /// # }
1785 /// ```
1786 pub fn readiness(&self) -> Ready {
1787 self.inner.readiness()
1788 }
1789
1790 /// Set the registration's readiness
1791 ///
1792 /// If the associated `Registration` is registered with a [`Poll`] instance
1793 /// and has requested readiness events that include `ready`, then a future
1794 /// call to [`Poll::poll`] will receive a readiness event representing the
1795 /// readiness state change.
1796 ///
1797 /// # Note
1798 ///
1799 /// There is no guarantee that `readiness` establishes any sort of memory
1800 /// ordering. Any concurrent data access must be synchronized using another
1801 /// strategy.
1802 ///
1803 /// There is also no guarantee as to when the readiness event will be
1804 /// delivered to poll. A best attempt will be made to make the delivery in a
1805 /// "timely" fashion. For example, the following is **not** guaranteed to
1806 /// work:
1807 ///
1808 /// ```
1809 /// # use std::error::Error;
1810 /// # fn try_main() -> Result<(), Box<Error>> {
1811 /// use futures_net::driver::sys::{Poll, Registration, Token};
1812 /// use futures_net::driver::sys::event::{Events, Ready, PollOpt};
1813 ///
1814 /// let poll = Poll::new()?;
1815 /// let (registration, set_readiness) = Registration::new2();
1816 ///
1817 /// poll.register(®istration,
1818 /// Token(0),
1819 /// Ready::readable(),
1820 /// PollOpt::edge())?;
1821 ///
1822 /// // Set the readiness, then immediately poll to try to get the readiness
1823 /// // event
1824 /// set_readiness.set_readiness(Ready::readable())?;
1825 ///
1826 /// let mut events = Events::with_capacity(1024);
1827 /// poll.poll(&mut events, None)?;
1828 ///
1829 /// // There is NO guarantee that the following will work. It is possible
1830 /// // that the readiness event will be delivered at a later time.
1831 /// let event = events.get(0).unwrap();
1832 /// assert_eq!(event.token(), Token(0));
1833 /// assert!(event.readiness().is_readable());
1834 /// # Ok(())
1835 /// # }
1836 /// #
1837 /// # fn main() {
1838 /// # try_main().unwrap();
1839 /// # }
1840 /// ```
1841 ///
1842 /// # Examples
1843 ///
1844 /// A simple example, for a more elaborate example, see the [`Evented`]
1845 /// documentation.
1846 ///
1847 /// ```
1848 /// # use std::error::Error;
1849 /// # fn try_main() -> Result<(), Box<Error>> {
1850 /// use futures_net::driver::sys::Registration;
1851 /// use futures_net::driver::sys::event::Ready;
1852 ///
1853 /// let (registration, set_readiness) = Registration::new2();
1854 ///
1855 /// assert!(set_readiness.readiness().is_empty());
1856 ///
1857 /// set_readiness.set_readiness(Ready::readable())?;
1858 /// assert!(set_readiness.readiness().is_readable());
1859 /// # Ok(())
1860 /// # }
1861 /// #
1862 /// # fn main() {
1863 /// # try_main().unwrap();
1864 /// # }
1865 /// ```
1866 ///
1867 /// [`Registration`]: struct.Registration.html
1868 /// [`Evented`]: event/trait.Evented.html#examples
1869 /// [`Poll`]: struct.Poll.html
1870 /// [`Poll::poll`]: struct.Poll.html#method.poll
1871 pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1872 self.inner.set_readiness(ready)
1873 }
1874}
1875
1876impl fmt::Debug for SetReadiness {
1877 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1878 f.debug_struct("SetReadiness").finish()
1879 }
1880}
1881
1882impl RegistrationInner {
1883 /// Get the registration's readiness.
1884 fn readiness(&self) -> Ready {
1885 self.state.load(Relaxed).readiness()
1886 }
1887
1888 /// Set the registration's readiness.
1889 ///
1890 /// This function can be called concurrently by an arbitrary number of
1891 /// SetReadiness handles.
1892 fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1893 // Load the current atomic state.
1894 let mut state = self.state.load(Acquire);
1895 let mut next;
1896
1897 loop {
1898 next = state;
1899
1900 if state.is_dropped() {
1901 // Node is dropped, no more notifications
1902 return Ok(());
1903 }
1904
1905 // Update the readiness
1906 next.set_readiness(ready);
1907
1908 // If the readiness is not blank, try to obtain permission to
1909 // push the node into the readiness queue.
1910 if !next.effective_readiness().is_empty() {
1911 next.set_queued();
1912 }
1913
1914 let actual = self.state.compare_and_swap(state, next, AcqRel);
1915
1916 if state == actual {
1917 break;
1918 }
1919
1920 state = actual;
1921 }
1922
1923 if !state.is_queued() && next.is_queued() {
1924 // We toggled the queued flag, making us responsible for queuing the
1925 // node in the MPSC readiness queue.
1926 self.enqueue_with_wakeup()?;
1927 }
1928
1929 Ok(())
1930 }
1931
1932 /// Update the registration details associated with the node
1933 fn update(
1934 &self,
1935 poll: &Poll,
1936 token: Token,
1937 interest: Ready,
1938 opt: PollOpt,
1939 ) -> io::Result<()> {
1940 // First, ensure poll instances match
1941 //
1942 // Load the queue pointer, `Relaxed` is sufficient here as only the
1943 // pointer is being operated on. The actual memory is guaranteed to be
1944 // visible the `poll: &Poll` ref passed as an argument to the function.
1945 let mut queue = self.readiness_queue.load(Relaxed);
1946 let other: &*mut () =
1947 unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1948 let other = *other;
1949
1950 debug_assert!(
1951 mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>()
1952 );
1953
1954 if queue.is_null() {
1955 // Attempt to set the queue pointer. `Release` ordering synchronizes
1956 // with `Acquire` in `ensure_with_wakeup`.
1957 let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1958
1959 if actual.is_null() {
1960 // The CAS succeeded, this means that the node's ref count
1961 // should be incremented to reflect that the `poll` function
1962 // effectively owns the node as well.
1963 //
1964 // `Relaxed` ordering used for the same reason as in
1965 // RegistrationInner::clone
1966 self.ref_count.fetch_add(1, Relaxed);
1967
1968 // Note that the `queue` reference stored in our
1969 // `readiness_queue` field is intended to be a strong reference,
1970 // so now that we've successfully claimed the reference we bump
1971 // the refcount here.
1972 //
1973 // Down below in `release_node` when we deallocate this
1974 // `RegistrationInner` is where we'll transmute this back to an
1975 // arc and decrement the reference count.
1976 mem::forget(poll.readiness_queue.clone());
1977 } else {
1978 // The CAS failed, another thread set the queue pointer, so ensure
1979 // that the pointer and `other` match
1980 if actual != other {
1981 return Err(io::Error::new(
1982 io::ErrorKind::Other,
1983 "registration handle associated with another `Poll` instance",
1984 ));
1985 }
1986 }
1987
1988 queue = other;
1989 } else if queue != other {
1990 return Err(io::Error::new(
1991 io::ErrorKind::Other,
1992 "registration handle associated with another `Poll` instance",
1993 ));
1994 }
1995
1996 unsafe {
1997 let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1998 debug_assert_eq!(queue as usize, *actual);
1999 }
2000
2001 // The `update_lock` atomic is used as a flag ensuring only a single
2002 // thread concurrently enters the `update` critical section. Any
2003 // concurrent calls to update are discarded. If coordinated updates are
2004 // required, the Mio user is responsible for handling that.
2005 //
2006 // Acquire / Release ordering is used on `update_lock` to ensure that
2007 // data access to the `token_*` variables are scoped to the critical
2008 // section.
2009
2010 // Acquire the update lock.
2011 if self.update_lock.compare_and_swap(false, true, Acquire) {
2012 // The lock is already held. Discard the update
2013 return Ok(());
2014 }
2015
2016 // Relaxed ordering is acceptable here as the only memory that needs to
2017 // be visible as part of the update are the `token_*` variables, and
2018 // ordering has already been handled by the `update_lock` access.
2019 let mut state = self.state.load(Relaxed);
2020 let mut next;
2021
2022 // Read the current token, again this memory has been ordered by the
2023 // acquire on `update_lock`.
2024 let curr_token_pos = state.token_write_pos();
2025 let curr_token = unsafe { self::token(self, curr_token_pos) };
2026
2027 let mut next_token_pos = curr_token_pos;
2028
2029 // If the `update` call is changing the token, then compute the next
2030 // available token slot and write the token there.
2031 //
2032 // Note that this computation is happening *outside* of the
2033 // compare-and-swap loop. The update lock ensures that only a single
2034 // thread could be mutating the write_token_position, so the
2035 // `next_token_pos` will never need to be recomputed even if
2036 // `token_read_pos` concurrently changes. This is because
2037 // `token_read_pos` can ONLY concurrently change to the current value of
2038 // `token_write_pos`, so `next_token_pos` will always remain valid.
2039 if token != curr_token {
2040 next_token_pos = state.next_token_pos();
2041
2042 // Update the token
2043 match next_token_pos {
2044 0 => unsafe { *self.token_0.get() = token },
2045 1 => unsafe { *self.token_1.get() = token },
2046 2 => unsafe { *self.token_2.get() = token },
2047 _ => unreachable!(),
2048 }
2049 }
2050
2051 // Now enter the compare-and-swap loop
2052 loop {
2053 next = state;
2054
2055 // The node is only dropped once all `Registration` handles are
2056 // dropped. Only `Registration` can call `update`.
2057 debug_assert!(!state.is_dropped());
2058
2059 // Update the write token position, this will also release the token
2060 // to Poll::poll.
2061 next.set_token_write_pos(next_token_pos);
2062
2063 // Update readiness and poll opts
2064 next.set_interest(interest);
2065 next.set_poll_opt(opt);
2066
2067 // If there is effective readiness, the node will need to be queued
2068 // for processing. This exact behavior is still TBD, so we are
2069 // conservative for now and always fire.
2070 //
2071 // See https://github.com/carllerche/mio/issues/535.
2072 if !next.effective_readiness().is_empty() {
2073 next.set_queued();
2074 }
2075
2076 // compare-and-swap the state values. Only `Release` is needed here.
2077 // The `Release` ensures that `Poll::poll` will see the token
2078 // update and the update function doesn't care about any other
2079 // memory visibility.
2080 let actual = self.state.compare_and_swap(state, next, Release);
2081
2082 if actual == state {
2083 break;
2084 }
2085
2086 // CAS failed, but `curr_token_pos` should not have changed given
2087 // that we still hold the update lock.
2088 debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2089
2090 state = actual;
2091 }
2092
2093 // Release the lock
2094 self.update_lock.store(false, Release);
2095
2096 if !state.is_queued() && next.is_queued() {
2097 // We are responsible for enqueing the node.
2098 enqueue_with_wakeup(queue, self)?;
2099 }
2100
2101 Ok(())
2102 }
2103}
2104
2105impl ops::Deref for RegistrationInner {
2106 type Target = ReadinessNode;
2107
2108 fn deref(&self) -> &ReadinessNode {
2109 unsafe { &*self.node }
2110 }
2111}
2112
2113impl Clone for RegistrationInner {
2114 fn clone(&self) -> RegistrationInner {
2115 // Using a relaxed ordering is alright here, as knowledge of the
2116 // original reference prevents other threads from erroneously deleting
2117 // the object.
2118 //
2119 // As explained in the [Boost documentation][1], Increasing the
2120 // reference counter can always be done with memory_order_relaxed: New
2121 // references to an object can only be formed from an existing
2122 // reference, and passing an existing reference from one thread to
2123 // another must already provide any required synchronization.
2124 //
2125 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2126 let old_size = self.ref_count.fetch_add(1, Relaxed);
2127
2128 // However we need to guard against massive refcounts in case someone
2129 // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2130 // and users will use-after free. We racily saturate to `isize::MAX` on
2131 // the assumption that there aren't ~2 billion threads incrementing
2132 // the reference count at once. This branch will never be taken in
2133 // any realistic program.
2134 //
2135 // We abort because such a program is incredibly degenerate, and we
2136 // don't care to support it.
2137 if old_size & !MAX_REFCOUNT != 0 {
2138 process::abort();
2139 }
2140
2141 RegistrationInner { node: self.node }
2142 }
2143}
2144
2145impl Drop for RegistrationInner {
2146 fn drop(&mut self) {
2147 // Only handles releasing from `Registration` and `SetReadiness`
2148 // handles. Poll has to call this itself.
2149 release_node(self.node);
2150 }
2151}
2152
2153/*
2154 *
2155 * ===== ReadinessQueue =====
2156 *
2157 */
2158
2159impl ReadinessQueue {
2160 /// Create a new `ReadinessQueue`.
2161 fn new() -> io::Result<ReadinessQueue> {
2162 is_send::<Self>();
2163 is_sync::<Self>();
2164
2165 let end_marker = Box::new(ReadinessNode::marker());
2166 let sleep_marker = Box::new(ReadinessNode::marker());
2167 let closed_marker = Box::new(ReadinessNode::marker());
2168
2169 let ptr = &*end_marker as *const _ as *mut _;
2170
2171 Ok(ReadinessQueue {
2172 inner: Arc::new(ReadinessQueueInner {
2173 awakener: linux::Awakener::new()?,
2174 head_readiness: AtomicPtr::new(ptr),
2175 tail_readiness: UnsafeCell::new(ptr),
2176 end_marker,
2177 sleep_marker,
2178 closed_marker,
2179 }),
2180 })
2181 }
2182
2183 /// Poll the queue for new events
2184 fn poll(&self, dst: &mut linux::Events) {
2185 // `until` is set with the first node that gets re-enqueued due to being
2186 // set to have level-triggered notifications. This prevents an infinite
2187 // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2188 let mut until = ptr::null_mut();
2189
2190 if dst.len() == dst.capacity() {
2191 // If `dst` is already full, the readiness queue won't be drained.
2192 // This might result in `sleep_marker` staying in the queue and
2193 // unecessary pipe writes occuring.
2194 self.inner.clear_sleep_marker();
2195 }
2196
2197 'outer: while dst.len() < dst.capacity() {
2198 // Dequeue a node. If the queue is in an inconsistent state, then
2199 // stop polling. `Poll::poll` will be called again shortly and enter
2200 // a syscall, which should be enough to enable the other thread to
2201 // finish the queuing process.
2202 let ptr = match unsafe { self.inner.dequeue_node(until) } {
2203 Dequeue::Empty | Dequeue::Inconsistent => break,
2204 Dequeue::Data(ptr) => ptr,
2205 };
2206
2207 let node = unsafe { &*ptr };
2208
2209 // Read the node state with Acquire ordering. This allows reading
2210 // the token variables.
2211 let mut state = node.state.load(Acquire);
2212 let mut next;
2213 let mut readiness;
2214 let mut opt;
2215
2216 loop {
2217 // Build up any changes to the readiness node's state and
2218 // attempt the CAS at the end
2219 next = state;
2220
2221 // Given that the node was just read from the queue, the
2222 // `queued` flag should still be set.
2223 debug_assert!(state.is_queued());
2224
2225 // The dropped flag means we need to release the node and
2226 // perform no further processing on it.
2227 if state.is_dropped() {
2228 // Release the node and continue
2229 release_node(ptr);
2230 continue 'outer;
2231 }
2232
2233 // Process the node
2234 readiness = state.effective_readiness();
2235 opt = state.poll_opt();
2236
2237 if opt.is_edge() {
2238 // Mark the node as dequeued
2239 next.set_dequeued();
2240
2241 if opt.is_oneshot() && !readiness.is_empty() {
2242 next.disarm();
2243 }
2244 } else if readiness.is_empty() {
2245 next.set_dequeued();
2246 }
2247
2248 // Ensure `token_read_pos` is set to `token_write_pos` so that
2249 // we read the most up to date token value.
2250 next.update_token_read_pos();
2251
2252 if state == next {
2253 break;
2254 }
2255
2256 let actual = node.state.compare_and_swap(state, next, AcqRel);
2257
2258 if actual == state {
2259 break;
2260 }
2261
2262 state = actual;
2263 }
2264
2265 // If the queued flag is still set, then the node must be requeued.
2266 // This typically happens when using level-triggered notifications.
2267 if next.is_queued() {
2268 if until.is_null() {
2269 // We never want to see the node again
2270 until = ptr;
2271 }
2272
2273 // Requeue the node
2274 self.inner.enqueue_node(node);
2275 }
2276
2277 if !readiness.is_empty() {
2278 // Get the token
2279 let token = unsafe { token(node, next.token_read_pos()) };
2280
2281 // Push the event
2282 dst.push_event(Event::new(readiness, token));
2283 }
2284 }
2285 }
2286
2287 /// Prepare the queue for the `Poll::poll` thread to block in the system
2288 /// selector. This involves changing `head_readiness` to `sleep_marker`.
2289 /// Returns true if successful and `poll` can block.
2290 fn prepare_for_sleep(&self) -> bool {
2291 let end_marker = self.inner.end_marker();
2292 let sleep_marker = self.inner.sleep_marker();
2293
2294 let tail = unsafe { *self.inner.tail_readiness.get() };
2295
2296 // If the tail is currently set to the sleep_marker, then check if the
2297 // head is as well. If it is, then the queue is currently ready to
2298 // sleep. If it is not, then the queue is not empty and there should be
2299 // no sleeping.
2300 if tail == sleep_marker {
2301 return self.inner.head_readiness.load(Acquire) == sleep_marker;
2302 }
2303
2304 // If the tail is not currently set to `end_marker`, then the queue is
2305 // not empty.
2306 if tail != end_marker {
2307 return false;
2308 }
2309
2310 // The sleep marker is *not* currently in the readiness queue.
2311 //
2312 // The sleep marker is only inserted in this function. It is also only
2313 // inserted in the tail position. This is guaranteed by first checking
2314 // that the end marker is in the tail position, pushing the sleep marker
2315 // after the end marker, then removing the end marker.
2316 //
2317 // Before inserting a node into the queue, the next pointer has to be
2318 // set to null. Again, this is only safe to do when the node is not
2319 // currently in the queue, but we already have ensured this.
2320 self.inner
2321 .sleep_marker
2322 .next_readiness
2323 .store(ptr::null_mut(), Relaxed);
2324
2325 let actual =
2326 self.inner
2327 .head_readiness
2328 .compare_and_swap(end_marker, sleep_marker, AcqRel);
2329
2330 debug_assert!(actual != sleep_marker);
2331
2332 if actual != end_marker {
2333 // The readiness queue is not empty
2334 return false;
2335 }
2336
2337 // The current tail should be pointing to `end_marker`
2338 debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2339 // The `end_marker` next pointer should be null
2340 debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2341
2342 // Update tail pointer.
2343 unsafe {
2344 *self.inner.tail_readiness.get() = sleep_marker;
2345 }
2346 true
2347 }
2348}
2349
2350impl Drop for ReadinessQueue {
2351 fn drop(&mut self) {
2352 // Close the queue by enqueuing the closed node
2353 self.inner.enqueue_node(&*self.inner.closed_marker);
2354
2355 loop {
2356 // Free any nodes that happen to be left in the readiness queue
2357 let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2358 Dequeue::Empty => break,
2359 Dequeue::Inconsistent => {
2360 // This really shouldn't be possible as all other handles to
2361 // `ReadinessQueueInner` are dropped, but handle this by
2362 // spinning I guess?
2363 continue;
2364 }
2365 Dequeue::Data(ptr) => ptr,
2366 };
2367
2368 let node = unsafe { &*ptr };
2369
2370 let state = node.state.load(Acquire);
2371
2372 debug_assert!(state.is_queued());
2373
2374 release_node(ptr);
2375 }
2376 }
2377}
2378
2379impl ReadinessQueueInner {
2380 fn wakeup(&self) -> io::Result<()> {
2381 self.awakener.wakeup()
2382 }
2383
2384 /// Prepend the given node to the head of the readiness queue. This is done
2385 /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2386 fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2387 if self.enqueue_node(node) {
2388 self.wakeup()?;
2389 }
2390
2391 Ok(())
2392 }
2393
2394 /// Push the node into the readiness queue
2395 fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2396 // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2397 let node_ptr = node as *const _ as *mut _;
2398
2399 // Relaxed used as the ordering is "released" when swapping
2400 // `head_readiness`
2401 node.next_readiness.store(ptr::null_mut(), Relaxed);
2402
2403 unsafe {
2404 let mut prev = self.head_readiness.load(Acquire);
2405
2406 loop {
2407 if prev == self.closed_marker() {
2408 debug_assert!(node_ptr != self.closed_marker());
2409 // debug_assert!(node_ptr != self.end_marker());
2410 debug_assert!(node_ptr != self.sleep_marker());
2411
2412 if node_ptr != self.end_marker() {
2413 // The readiness queue is shutdown, but the enqueue flag was
2414 // set. This means that we are responsible for decrementing
2415 // the ready queue's ref count
2416 debug_assert!(node.ref_count.load(Relaxed) >= 2);
2417 release_node(node_ptr);
2418 }
2419
2420 return false;
2421 }
2422
2423 let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2424
2425 if prev == act {
2426 break;
2427 }
2428
2429 prev = act;
2430 }
2431
2432 debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2433
2434 (*prev).next_readiness.store(node_ptr, Release);
2435
2436 prev == self.sleep_marker()
2437 }
2438 }
2439
2440 fn clear_sleep_marker(&self) {
2441 let end_marker = self.end_marker();
2442 let sleep_marker = self.sleep_marker();
2443
2444 unsafe {
2445 let tail = *self.tail_readiness.get();
2446
2447 if tail != self.sleep_marker() {
2448 return;
2449 }
2450
2451 // The empty markeer is *not* currently in the readiness queue
2452 // (since the sleep markeris).
2453 self.end_marker
2454 .next_readiness
2455 .store(ptr::null_mut(), Relaxed);
2456
2457 let actual =
2458 self.head_readiness
2459 .compare_and_swap(sleep_marker, end_marker, AcqRel);
2460
2461 debug_assert!(actual != end_marker);
2462
2463 if actual != sleep_marker {
2464 // The readiness queue is not empty, we cannot remove the sleep
2465 // markeer
2466 return;
2467 }
2468
2469 // Update the tail pointer.
2470 *self.tail_readiness.get() = end_marker;
2471 }
2472 }
2473
2474 /// Must only be called in `poll` or `drop`
2475 unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2476 // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2477 // with the modifications mentioned at the top of the file.
2478 let mut tail = *self.tail_readiness.get();
2479 let mut next = (*tail).next_readiness.load(Acquire);
2480
2481 if tail == self.end_marker()
2482 || tail == self.sleep_marker()
2483 || tail == self.closed_marker()
2484 {
2485 if next.is_null() {
2486 // Make sure the sleep marker is removed (as we are no longer
2487 // sleeping
2488 self.clear_sleep_marker();
2489
2490 return Dequeue::Empty;
2491 }
2492
2493 *self.tail_readiness.get() = next;
2494 tail = next;
2495 next = (*next).next_readiness.load(Acquire);
2496 }
2497
2498 // Only need to check `until` at this point. `until` is either null,
2499 // which will never match tail OR it is a node that was pushed by
2500 // the current thread. This means that either:
2501 //
2502 // 1) The queue is inconsistent, which is handled explicitly
2503 // 2) We encounter `until` at this point in dequeue
2504 // 3) we will pop a different node
2505 if tail == until {
2506 return Dequeue::Empty;
2507 }
2508
2509 if !next.is_null() {
2510 *self.tail_readiness.get() = next;
2511 return Dequeue::Data(tail);
2512 }
2513
2514 if self.head_readiness.load(Acquire) != tail {
2515 return Dequeue::Inconsistent;
2516 }
2517
2518 // Push the stub node
2519 self.enqueue_node(&*self.end_marker);
2520
2521 next = (*tail).next_readiness.load(Acquire);
2522
2523 if !next.is_null() {
2524 *self.tail_readiness.get() = next;
2525 return Dequeue::Data(tail);
2526 }
2527
2528 Dequeue::Inconsistent
2529 }
2530
2531 fn end_marker(&self) -> *mut ReadinessNode {
2532 &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2533 }
2534
2535 fn sleep_marker(&self) -> *mut ReadinessNode {
2536 &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2537 }
2538
2539 fn closed_marker(&self) -> *mut ReadinessNode {
2540 &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2541 }
2542}
2543
2544impl ReadinessNode {
2545 /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2546 fn new(
2547 queue: *mut (),
2548 token: Token,
2549 interest: Ready,
2550 opt: PollOpt,
2551 ref_count: usize,
2552 ) -> ReadinessNode {
2553 ReadinessNode {
2554 state: AtomicState::new(interest, opt),
2555 // Only the first token is set, the others are initialized to 0
2556 token_0: UnsafeCell::new(token),
2557 token_1: UnsafeCell::new(Token(0)),
2558 token_2: UnsafeCell::new(Token(0)),
2559 next_readiness: AtomicPtr::new(ptr::null_mut()),
2560 update_lock: AtomicBool::new(false),
2561 readiness_queue: AtomicPtr::new(queue),
2562 ref_count: AtomicUsize::new(ref_count),
2563 }
2564 }
2565
2566 fn marker() -> ReadinessNode {
2567 ReadinessNode {
2568 state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2569 token_0: UnsafeCell::new(Token(0)),
2570 token_1: UnsafeCell::new(Token(0)),
2571 token_2: UnsafeCell::new(Token(0)),
2572 next_readiness: AtomicPtr::new(ptr::null_mut()),
2573 update_lock: AtomicBool::new(false),
2574 readiness_queue: AtomicPtr::new(ptr::null_mut()),
2575 ref_count: AtomicUsize::new(0),
2576 }
2577 }
2578
2579 fn enqueue_with_wakeup(&self) -> io::Result<()> {
2580 let queue = self.readiness_queue.load(Acquire);
2581
2582 if queue.is_null() {
2583 // Not associated with a queue, nothing to do
2584 return Ok(());
2585 }
2586
2587 enqueue_with_wakeup(queue, self)
2588 }
2589}
2590
2591fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2592 debug_assert!(!queue.is_null());
2593 // This is ugly... but we don't want to bump the ref count.
2594 let queue: &Arc<ReadinessQueueInner> =
2595 unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2596 queue.enqueue_node_with_wakeup(node)
2597}
2598
2599unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2600 match pos {
2601 0 => *node.token_0.get(),
2602 1 => *node.token_1.get(),
2603 2 => *node.token_2.get(),
2604 _ => unreachable!(),
2605 }
2606}
2607
2608fn release_node(ptr: *mut ReadinessNode) {
2609 unsafe {
2610 // `AcqRel` synchronizes with other `release_node` functions and ensures
2611 // that the drop happens after any reads / writes on other threads.
2612 if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2613 return;
2614 }
2615
2616 let node = Box::from_raw(ptr);
2617
2618 // Decrement the readiness_queue Arc
2619 let queue = node.readiness_queue.load(Acquire);
2620
2621 if queue.is_null() {
2622 return;
2623 }
2624
2625 let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2626 }
2627}
2628
2629impl AtomicState {
2630 fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2631 let state = ReadinessState::new(interest, opt);
2632
2633 AtomicState {
2634 inner: AtomicUsize::new(state.into()),
2635 }
2636 }
2637
2638 /// Loads the current `ReadinessState`
2639 fn load(&self, order: Ordering) -> ReadinessState {
2640 self.inner.load(order).into()
2641 }
2642
2643 /// Stores a state if the current state is the same as `current`.
2644 fn compare_and_swap(
2645 &self,
2646 current: ReadinessState,
2647 new: ReadinessState,
2648 order: Ordering,
2649 ) -> ReadinessState {
2650 self.inner
2651 .compare_and_swap(current.into(), new.into(), order)
2652 .into()
2653 }
2654
2655 // Returns `true` if the node should be queued
2656 fn flag_as_dropped(&self) -> bool {
2657 let prev: ReadinessState = self
2658 .inner
2659 .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2660 .into();
2661 // The flag should not have been previously set
2662 debug_assert!(!prev.is_dropped());
2663
2664 !prev.is_queued()
2665 }
2666}
2667
2668impl ReadinessState {
2669 // Create a `ReadinessState` initialized with the provided arguments
2670 #[inline]
2671 fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2672 let interest = event::ready_as_usize(interest);
2673 let opt = event::opt_as_usize(opt);
2674
2675 debug_assert!(interest <= MASK_4);
2676 debug_assert!(opt <= MASK_4);
2677
2678 let mut val = interest << INTEREST_SHIFT;
2679 val |= opt << POLL_OPT_SHIFT;
2680
2681 ReadinessState(val)
2682 }
2683
2684 #[inline]
2685 fn get(self, mask: usize, shift: usize) -> usize {
2686 (self.0 >> shift) & mask
2687 }
2688
2689 #[inline]
2690 fn set(&mut self, val: usize, mask: usize, shift: usize) {
2691 self.0 = (self.0 & !(mask << shift)) | (val << shift)
2692 }
2693
2694 /// Get the readiness
2695 #[inline]
2696 fn readiness(self) -> Ready {
2697 let v = self.get(MASK_4, READINESS_SHIFT);
2698 event::ready_from_usize(v)
2699 }
2700
2701 #[inline]
2702 fn effective_readiness(self) -> Ready {
2703 self.readiness() & self.interest()
2704 }
2705
2706 /// Set the readiness
2707 #[inline]
2708 fn set_readiness(&mut self, v: Ready) {
2709 self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2710 }
2711
2712 /// Get the interest
2713 #[inline]
2714 fn interest(self) -> Ready {
2715 let v = self.get(MASK_4, INTEREST_SHIFT);
2716 event::ready_from_usize(v)
2717 }
2718
2719 /// Set the interest
2720 #[inline]
2721 fn set_interest(&mut self, v: Ready) {
2722 self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2723 }
2724
2725 #[inline]
2726 fn disarm(&mut self) {
2727 self.set_interest(Ready::empty());
2728 }
2729
2730 /// Get the poll options
2731 #[inline]
2732 fn poll_opt(self) -> PollOpt {
2733 let v = self.get(MASK_4, POLL_OPT_SHIFT);
2734 event::opt_from_usize(v)
2735 }
2736
2737 /// Set the poll options
2738 #[inline]
2739 fn set_poll_opt(&mut self, v: PollOpt) {
2740 self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2741 }
2742
2743 #[inline]
2744 fn is_queued(self) -> bool {
2745 self.0 & QUEUED_MASK == QUEUED_MASK
2746 }
2747
2748 /// Set the queued flag
2749 #[inline]
2750 fn set_queued(&mut self) {
2751 // Dropped nodes should never be queued
2752 debug_assert!(!self.is_dropped());
2753 self.0 |= QUEUED_MASK;
2754 }
2755
2756 #[inline]
2757 fn set_dequeued(&mut self) {
2758 debug_assert!(self.is_queued());
2759 self.0 &= !QUEUED_MASK
2760 }
2761
2762 #[inline]
2763 fn is_dropped(self) -> bool {
2764 self.0 & DROPPED_MASK == DROPPED_MASK
2765 }
2766
2767 #[inline]
2768 fn token_read_pos(self) -> usize {
2769 self.get(MASK_2, TOKEN_RD_SHIFT)
2770 }
2771
2772 #[inline]
2773 fn token_write_pos(self) -> usize {
2774 self.get(MASK_2, TOKEN_WR_SHIFT)
2775 }
2776
2777 #[inline]
2778 fn next_token_pos(self) -> usize {
2779 let rd = self.token_read_pos();
2780 let wr = self.token_write_pos();
2781
2782 match wr {
2783 0 => match rd {
2784 1 => 2,
2785 2 => 1,
2786 0 => 1,
2787 _ => unreachable!(),
2788 },
2789 1 => match rd {
2790 0 => 2,
2791 2 => 0,
2792 1 => 2,
2793 _ => unreachable!(),
2794 },
2795 2 => match rd {
2796 0 => 1,
2797 1 => 0,
2798 2 => 0,
2799 _ => unreachable!(),
2800 },
2801 _ => unreachable!(),
2802 }
2803 }
2804
2805 #[inline]
2806 fn set_token_write_pos(&mut self, val: usize) {
2807 self.set(val, MASK_2, TOKEN_WR_SHIFT);
2808 }
2809
2810 #[inline]
2811 fn update_token_read_pos(&mut self) {
2812 let val = self.token_write_pos();
2813 self.set(val, MASK_2, TOKEN_RD_SHIFT);
2814 }
2815}
2816
2817impl From<ReadinessState> for usize {
2818 fn from(src: ReadinessState) -> usize {
2819 src.0
2820 }
2821}
2822
2823impl From<usize> for ReadinessState {
2824 fn from(src: usize) -> ReadinessState {
2825 ReadinessState(src)
2826 }
2827}
2828
2829fn is_send<T: Send>() {}
2830fn is_sync<T: Sync>() {}
2831
2832impl SelectorId {
2833 pub fn new() -> SelectorId {
2834 SelectorId {
2835 id: AtomicUsize::new(0),
2836 }
2837 }
2838
2839 pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2840 let selector_id = self.id.load(Ordering::SeqCst);
2841
2842 if selector_id != 0 && selector_id != poll.selector.id() {
2843 Err(io::Error::new(
2844 io::ErrorKind::Other,
2845 "socket already registered",
2846 ))
2847 } else {
2848 self.id.store(poll.selector.id(), Ordering::SeqCst);
2849 Ok(())
2850 }
2851 }
2852}
2853
2854impl Clone for SelectorId {
2855 fn clone(&self) -> SelectorId {
2856 SelectorId {
2857 id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2858 }
2859 }
2860}
2861
2862#[test]
2863pub fn as_raw_fd() {
2864 let poll = Poll::new().unwrap();
2865 assert!(poll.as_raw_fd() > 0);
2866}