retty_io/poll.rs
1use event_imp::{self as event, Event, Evented, PollOpt, Ready};
2use std::cell::UnsafeCell;
3#[cfg(all(unix, not(target_os = "fuchsia")))]
4use std::os::unix::io::AsRawFd;
5#[cfg(all(unix, not(target_os = "fuchsia")))]
6use std::os::unix::io::RawFd;
7use std::process;
8use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release, SeqCst};
9use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
10use std::sync::{Arc, Condvar, Mutex};
11use std::time::{Duration, Instant};
12use std::{fmt, io, ptr, usize};
13use std::{isize, mem, ops};
14use {sys, Token};
15
16// Poll is backed by two readiness queues. The first is a system readiness queue
17// represented by `sys::Selector`. The system readiness queue handles events
18// provided by the system, such as TCP and UDP. The second readiness queue is
19// implemented in user space by `ReadinessQueue`. It provides a way to implement
20// purely user space `Evented` types.
21//
22// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
23// list nodes. This significantly reduces the number of required allocations.
24// Each `Registration` / `SetReadiness` pair allocates a single readiness node
25// that is used for the lifetime of the registration.
26//
27// The readiness node also includes a single atomic variable, `state` that
28// tracks most of the state associated with the registration. This includes the
29// current readiness, interest, poll options, and internal state. When the node
30// state is mutated, it is queued in the MPSC channel. A call to
31// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
32// still be mutated while it is queued in the channel for processing.
33// Intermediate state values do not matter as long as the final state is
34// included in the call to `poll`. This is the eventually consistent nature of
35// the readiness queue.
36//
37// The readiness node is ref counted using the `ref_count` field. On creation,
38// the ref_count is initialized to 3: one `Registration` handle, one
39// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
40// doesn't *always* hold a handle to the node, we don't use the Arc type for
41// managing ref counts (this is to avoid constantly incrementing and
42// decrementing the ref count when pushing & popping from the queue). When the
43// `Registration` handle is dropped, the `dropped` flag is set on the node, then
44// the node is pushed into the registration queue. When Poll::poll pops the
45// node, it sees the drop flag is set, and decrements it's ref count.
46//
47// The MPSC queue is a modified version of the intrusive MPSC node based queue
48// described by 1024cores [1].
49//
50// The first modification is that two markers are used instead of a single
51// `stub`. The second marker is a `sleep_marker` which is used to signal to
52// producers that the consumer is going to sleep. This sleep_marker is only used
53// when the queue is empty, implying that the only node in the queue is
54// `end_marker`.
55//
56// The second modification is an `until` argument passed to the dequeue
57// function. When `poll` encounters a level-triggered node, the node will be
58// immediately pushed back into the queue. In order to avoid an infinite loop,
59// `poll` before pushing the node, the pointer is saved off and then passed
60// again as the `until` argument. If the next node to pop is `until`, then
61// `Dequeue::Empty` is returned.
62//
63// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
64
65/// Polls for readiness events on all registered values.
66///
67/// `Poll` allows a program to monitor a large number of `Evented` types,
68/// waiting until one or more become "ready" for some class of operations; e.g.
69/// reading and writing. An `Evented` type is considered ready if it is possible
70/// to immediately perform a corresponding operation; e.g. [`read`] or
71/// [`write`].
72///
73/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
74/// instance using the [`register`] method, supplying readiness interest. The
75/// readiness interest tells `Poll` which specific operations on the handle to
76/// monitor for readiness. A `Token` is also passed to the [`register`]
77/// function. When `Poll` returns a readiness event, it will include this token.
78/// This associates the event with the `Evented` handle that generated the
79/// event.
80///
81/// [`read`]: tcp/struct.TcpStream.html#method.read
82/// [`write`]: tcp/struct.TcpStream.html#method.write
83/// [`register`]: #method.register
84///
85/// # Examples
86///
87/// A basic example -- establishing a `TcpStream` connection.
88///
89/// ```
90/// # use std::error::Error;
91/// # fn try_main() -> Result<(), Box<Error>> {
92/// use retty_io::{Events, Poll, Ready, PollOpt, Token};
93/// use retty_io::net::TcpStream;
94///
95/// use std::net::{TcpListener, SocketAddr};
96///
97/// // Bind a server socket to connect to.
98/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
99/// let server = TcpListener::bind(&addr)?;
100///
101/// // Construct a new `Poll` handle as well as the `Events` we'll store into
102/// let poll = Poll::new()?;
103/// let mut events = Events::with_capacity(1024);
104///
105/// // Connect the stream
106/// let stream = TcpStream::connect(&server.local_addr()?)?;
107///
108/// // Register the stream with `Poll`
109/// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
110///
111/// // Wait for the socket to become ready. This has to happens in a loop to
112/// // handle spurious wakeups.
113/// loop {
114/// poll.poll(&mut events, None)?;
115///
116/// for event in &events {
117/// if event.token() == Token(0) && event.readiness().is_writable() {
118/// // The socket connected (probably, it could still be a spurious
119/// // wakeup)
120/// return Ok(());
121/// }
122/// }
123/// }
124/// # Ok(())
125/// # }
126/// #
127/// # fn main() {
128/// # try_main().unwrap();
129/// # }
130/// ```
131///
132/// # Edge-triggered and level-triggered
133///
134/// An [`Evented`] registration may request edge-triggered events or
135/// level-triggered events. This is done by setting `register`'s
136/// [`PollOpt`] argument to either [`edge`] or [`level`].
137///
138/// The difference between the two can be described as follows. Supposed that
139/// this scenario happens:
140///
141/// 1. A [`TcpStream`] is registered with `Poll`.
142/// 2. The socket receives 2kb of data.
143/// 3. A call to [`Poll::poll`] returns the token associated with the socket
144/// indicating readable readiness.
145/// 4. 1kb is read from the socket.
146/// 5. Another call to [`Poll::poll`] is made.
147///
148/// If when the socket was registered with `Poll`, edge triggered events were
149/// requested, then the call to [`Poll::poll`] done in step **5** will
150/// (probably) hang despite there being another 1kb still present in the socket
151/// read buffer. The reason for this is that edge-triggered mode delivers events
152/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
153/// caller might end up waiting for some data that is already present inside the
154/// socket buffer.
155///
156/// With edge-triggered events, operations **must** be performed on the
157/// `Evented` type until [`WouldBlock`] is returned. In other words, after
158/// receiving an event indicating readiness for a certain operation, one should
159/// assume that [`Poll::poll`] may never return another event for the same token
160/// and readiness until the operation returns [`WouldBlock`].
161///
162/// By contrast, when level-triggered notifications was requested, each call to
163/// [`Poll::poll`] will return an event for the socket as long as data remains
164/// in the socket buffer. Generally, level-triggered events should be avoided if
165/// high performance is a concern.
166///
167/// Since even with edge-triggered events, multiple events can be generated upon
168/// receipt of multiple chunks of data, the caller has the option to set the
169/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
170/// after the event is returned from [`Poll::poll`]. The subsequent calls to
171/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
172/// are disabled even if the readiness state changes. The handle can be
173/// re-enabled by calling [`reregister`]. When handles are disabled, internal
174/// resources used to monitor the handle are maintained until the handle is
175/// dropped or deregistered. This makes re-registering the handle a fast
176/// operation.
177///
178/// For example, in the following scenario:
179///
180/// 1. A [`TcpStream`] is registered with `Poll`.
181/// 2. The socket receives 2kb of data.
182/// 3. A call to [`Poll::poll`] returns the token associated with the socket
183/// indicating readable readiness.
184/// 4. 2kb is read from the socket.
185/// 5. Another call to read is issued and [`WouldBlock`] is returned
186/// 6. The socket receives another 2kb of data.
187/// 7. Another call to [`Poll::poll`] is made.
188///
189/// Assuming the socket was registered with `Poll` with the [`edge`] and
190/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
191/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
192/// returning an event.
193///
194/// In order to receive the event for the data received in step 6, the socket
195/// would need to be reregistered using [`reregister`].
196///
197/// [`PollOpt`]: struct.PollOpt.html
198/// [`edge`]: struct.PollOpt.html#method.edge
199/// [`level`]: struct.PollOpt.html#method.level
200/// [`Poll::poll`]: struct.Poll.html#method.poll
201/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
202/// [`Evented`]: event/trait.Evented.html
203/// [`TcpStream`]: tcp/struct.TcpStream.html
204/// [`reregister`]: #method.reregister
205/// [`oneshot`]: struct.PollOpt.html#method.oneshot
206///
207/// # Portability
208///
209/// Using `Poll` provides a portable interface across supported platforms as
210/// long as the caller takes the following into consideration:
211///
212/// ### Spurious events
213///
214/// [`Poll::poll`] may return readiness events even if the associated
215/// [`Evented`] handle is not actually ready. Given the same code, this may
216/// happen more on some platforms than others. It is important to never assume
217/// that, just because a readiness notification was received, that the
218/// associated operation will succeed as well.
219///
220/// If operation fails with [`WouldBlock`], then the caller should not treat
221/// this as an error, but instead should wait until another readiness event is
222/// received.
223///
224/// ### Draining readiness
225///
226/// When using edge-triggered mode, once a readiness event is received, the
227/// corresponding operation must be performed repeatedly until it returns
228/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
229/// readiness event will be delivered, even if further data is received for the
230/// [`Evented`] handle.
231///
232/// For example, in the first scenario described above, after step 5, even if
233/// the socket receives more data there is no guarantee that another readiness
234/// event will be delivered.
235///
236/// ### Readiness operations
237///
238/// The only readiness operations that are guaranteed to be present on all
239/// supported platforms are [`readable`] and [`writable`]. All other readiness
240/// operations may have false negatives and as such should be considered
241/// **hints**. This means that if a socket is registered with [`readable`],
242/// [`error`], and [`hup`] interest, and either an error or hup is received, a
243/// readiness event will be generated for the socket, but it **may** only
244/// include `readable` readiness. Also note that, given the potential for
245/// spurious events, receiving a readiness event with `hup` or `error` doesn't
246/// actually mean that a `read` on the socket will return a result matching the
247/// readiness event.
248///
249/// In other words, portable programs that explicitly check for [`hup`] or
250/// [`error`] readiness should be doing so as an **optimization** and always be
251/// able to handle an error or HUP situation when performing the actual read
252/// operation.
253///
254/// [`readable`]: struct.Ready.html#method.readable
255/// [`writable`]: struct.Ready.html#method.writable
256/// [`error`]: unix/struct.UnixReady.html#method.error
257/// [`hup`]: unix/struct.UnixReady.html#method.hup
258///
259/// ### Registering handles
260///
261/// Unless otherwise noted, it should be assumed that types implementing
262/// [`Evented`] will never become ready unless they are registered with `Poll`.
263///
264/// For example:
265///
266/// ```
267/// # use std::error::Error;
268/// # fn try_main() -> Result<(), Box<Error>> {
269/// use retty_io::{Poll, Ready, PollOpt, Token};
270/// use retty_io::net::TcpStream;
271/// use std::time::Duration;
272/// use std::thread;
273///
274/// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
275///
276/// thread::sleep(Duration::from_secs(1));
277///
278/// let poll = Poll::new()?;
279///
280/// // The connect is not guaranteed to have started until it is registered at
281/// // this point
282/// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
283/// # Ok(())
284/// # }
285/// #
286/// # fn main() {
287/// # try_main().unwrap();
288/// # }
289/// ```
290///
291/// # Implementation notes
292///
293/// `Poll` is backed by the selector provided by the operating system.
294///
295/// | OS | Selector |
296/// |------------|-----------|
297/// | Linux | [epoll] |
298/// | OS X, iOS | [kqueue] |
299/// | Windows | [IOCP] |
300/// | FreeBSD | [kqueue] |
301/// | Android | [epoll] |
302///
303/// On all supported platforms, socket operations are handled by using the
304/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
305/// accessing other features provided by individual system selectors. For
306/// example, Linux's [`signalfd`] feature can be used by registering the FD with
307/// `Poll` via [`EventedFd`].
308///
309/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
310/// direct call to the system selector. However, [IOCP] uses a completion model
311/// instead of a readiness model. In this case, `Poll` must adapt the completion
312/// model retty-io's API. While non-trivial, the bridge layer is still quite
313/// efficient. The most expensive part being calls to `read` and `write` require
314/// data to be copied into an intermediate buffer before it is passed to the
315/// kernel.
316///
317/// Notifications generated by [`SetReadiness`] are handled by an internal
318/// readiness queue. A single call to [`Poll::poll`] will collect events from
319/// both from the system selector and the internal readiness queue.
320///
321/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
322/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
323/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
324/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
325/// [`EventedFd`]: unix/struct.EventedFd.html
326/// [`SetReadiness`]: struct.SetReadiness.html
327/// [`Poll::poll`]: struct.Poll.html#method.poll
328pub struct Poll {
329 // Platform specific IO selector
330 selector: sys::Selector,
331
332 // Custom readiness queue
333 readiness_queue: ReadinessQueue,
334
335 // Use an atomic to first check if a full lock will be required. This is a
336 // fast-path check for single threaded cases avoiding the extra syscall
337 lock_state: AtomicUsize,
338
339 // Sequences concurrent calls to `Poll::poll`
340 lock: Mutex<()>,
341
342 // Wakeup the next waiter
343 condvar: Condvar,
344}
345
346/// Handle to a user space `Poll` registration.
347///
348/// `Registration` allows implementing [`Evented`] for types that cannot work
349/// with the [system selector]. A `Registration` is always paired with a
350/// `SetReadiness`, which allows updating the registration's readiness state.
351/// When [`set_readiness`] is called and the `Registration` is associated with a
352/// [`Poll`] instance, a readiness event will be created and eventually returned
353/// by [`poll`].
354///
355/// A `Registration` / `SetReadiness` pair is created by calling
356/// [`Registration::new2`]. At this point, the registration is not being
357/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
358/// result in any readiness notifications.
359///
360/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
361/// the same [`register`], [`reregister`], and [`deregister`] functions used
362/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
363/// changes result in readiness events being dispatched to the [`Poll`] instance
364/// with which `Registration` is registered.
365///
366/// **Note**, before using `Registration` be sure to read the
367/// [`set_readiness`] documentation and the [portability] notes. The
368/// guarantees offered by `Registration` may be weaker than expected.
369///
370/// For high level documentation, see [`Poll`].
371///
372/// # Examples
373///
374/// ```
375/// use retty_io::{Ready, Registration, Poll, PollOpt, Token};
376/// use retty_io::event::Evented;
377///
378/// use std::io;
379/// use std::time::Instant;
380/// use std::thread;
381///
382/// pub struct Deadline {
383/// when: Instant,
384/// registration: Registration,
385/// }
386///
387/// impl Deadline {
388/// pub fn new(when: Instant) -> Deadline {
389/// let (registration, set_readiness) = Registration::new2();
390///
391/// thread::spawn(move || {
392/// let now = Instant::now();
393///
394/// if now < when {
395/// thread::sleep(when - now);
396/// }
397///
398/// set_readiness.set_readiness(Ready::readable());
399/// });
400///
401/// Deadline {
402/// when: when,
403/// registration: registration,
404/// }
405/// }
406///
407/// pub fn is_elapsed(&self) -> bool {
408/// Instant::now() >= self.when
409/// }
410/// }
411///
412/// impl Evented for Deadline {
413/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
414/// -> io::Result<()>
415/// {
416/// self.registration.register(poll, token, interest, opts)
417/// }
418///
419/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
420/// -> io::Result<()>
421/// {
422/// self.registration.reregister(poll, token, interest, opts)
423/// }
424///
425/// fn deregister(&self, poll: &Poll) -> io::Result<()> {
426/// poll.deregister(&self.registration)
427/// }
428/// }
429/// ```
430///
431/// [system selector]: struct.Poll.html#implementation-notes
432/// [`Poll`]: struct.Poll.html
433/// [`Registration::new2`]: struct.Registration.html#method.new2
434/// [`Evented`]: event/trait.Evented.html
435/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
436/// [`register`]: struct.Poll.html#method.register
437/// [`reregister`]: struct.Poll.html#method.reregister
438/// [`deregister`]: struct.Poll.html#method.deregister
439/// [portability]: struct.Poll.html#portability
440pub struct Registration {
441 inner: RegistrationInner,
442}
443
444unsafe impl Send for Registration {}
445unsafe impl Sync for Registration {}
446
447/// Updates the readiness state of the associated `Registration`.
448///
449/// See [`Registration`] for more documentation on using `SetReadiness` and
450/// [`Poll`] for high level polling documentation.
451///
452/// [`Poll`]: struct.Poll.html
453/// [`Registration`]: struct.Registration.html
454#[derive(Clone)]
455pub struct SetReadiness {
456 inner: RegistrationInner,
457}
458
459unsafe impl Send for SetReadiness {}
460unsafe impl Sync for SetReadiness {}
461
462/// Used to associate an IO type with a Selector
463#[derive(Debug)]
464pub struct SelectorId {
465 id: AtomicUsize,
466}
467
468struct RegistrationInner {
469 // Unsafe pointer to the registration's node. The node is ref counted. This
470 // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
471 // handle though it isn't stored anywhere. In other words, `Poll::poll`
472 // needs to decrement the ref count before the node is freed.
473 node: *mut ReadinessNode,
474}
475
476#[derive(Clone)]
477struct ReadinessQueue {
478 inner: Arc<ReadinessQueueInner>,
479}
480
481unsafe impl Send for ReadinessQueue {}
482unsafe impl Sync for ReadinessQueue {}
483
484struct ReadinessQueueInner {
485 // Used to wake up `Poll` when readiness is set in another thread.
486 awakener: sys::Awakener,
487
488 // Head of the MPSC queue used to signal readiness to `Poll::poll`.
489 head_readiness: AtomicPtr<ReadinessNode>,
490
491 // Tail of the readiness queue.
492 //
493 // Only accessed by Poll::poll. Coordination will be handled by the poll fn
494 tail_readiness: UnsafeCell<*mut ReadinessNode>,
495
496 // Fake readiness node used to punctuate the end of the readiness queue.
497 // Before attempting to read from the queue, this node is inserted in order
498 // to partition the queue between nodes that are "owned" by the dequeue end
499 // and nodes that will be pushed on by producers.
500 end_marker: Box<ReadinessNode>,
501
502 // Similar to `end_marker`, but this node signals to producers that `Poll`
503 // has gone to sleep and must be woken up.
504 sleep_marker: Box<ReadinessNode>,
505
506 // Similar to `end_marker`, but the node signals that the queue is closed.
507 // This happens when `ReadyQueue` is dropped and signals to producers that
508 // the nodes should no longer be pushed into the queue.
509 closed_marker: Box<ReadinessNode>,
510}
511
512/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
513/// queued into the MPSC channel.
514struct ReadinessNode {
515 // Node state, see struct docs for `ReadinessState`
516 //
517 // This variable is the primary point of coordination between all the
518 // various threads concurrently accessing the node.
519 state: AtomicState,
520
521 // The registration token cannot fit into the `state` variable, so it is
522 // broken out here. In order to atomically update both the state and token
523 // we have to jump through a few hoops.
524 //
525 // First, `state` includes `token_read_pos` and `token_write_pos`. These can
526 // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
527 // the token slot that contains the most up to date registration token.
528 // `token_read_pos` is the token slot that `poll` is currently reading from.
529 //
530 // When a call to `update` includes a different token than the one currently
531 // associated with the registration (token_write_pos), first an unused token
532 // slot is found. The unused slot is the one not represented by
533 // `token_read_pos` OR `token_write_pos`. The new token is written to this
534 // slot, then `state` is updated with the new `token_write_pos` value. This
535 // requires that there is only a *single* concurrent call to `update`.
536 //
537 // When `poll` reads a node state, it checks that `token_read_pos` matches
538 // `token_write_pos`. If they do not match, then it atomically updates
539 // `state` such that `token_read_pos` is set to `token_write_pos`. It will
540 // then read the token at the newly updated `token_read_pos`.
541 token_0: UnsafeCell<Token>,
542 token_1: UnsafeCell<Token>,
543 token_2: UnsafeCell<Token>,
544
545 // Used when the node is queued in the readiness linked list. Accessing
546 // this field requires winning the "queue" lock
547 next_readiness: AtomicPtr<ReadinessNode>,
548
549 // Ensures that there is only one concurrent call to `update`.
550 //
551 // Each call to `update` will attempt to swap `update_lock` from `false` to
552 // `true`. If the CAS succeeds, the thread has obtained the update lock. If
553 // the CAS fails, then the `update` call returns immediately and the update
554 // is discarded.
555 update_lock: AtomicBool,
556
557 // Pointer to Arc<ReadinessQueueInner>
558 readiness_queue: AtomicPtr<()>,
559
560 // Tracks the number of `ReadyRef` pointers
561 ref_count: AtomicUsize,
562}
563
564/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
565/// atomic variable handles encoding / decoding `ReadinessState` values.
566struct AtomicState {
567 inner: AtomicUsize,
568}
569
570const MASK_2: usize = 4 - 1;
571const MASK_4: usize = 16 - 1;
572const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
573const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
574
575const READINESS_SHIFT: usize = 0;
576const INTEREST_SHIFT: usize = 4;
577const POLL_OPT_SHIFT: usize = 8;
578const TOKEN_RD_SHIFT: usize = 12;
579const TOKEN_WR_SHIFT: usize = 14;
580const QUEUED_SHIFT: usize = 16;
581const DROPPED_SHIFT: usize = 17;
582
583/// Tracks all state for a single `ReadinessNode`. The state is packed into a
584/// `usize` variable from low to high bit as follows:
585///
586/// 4 bits: Registration current readiness
587/// 4 bits: Registration interest
588/// 4 bits: Poll options
589/// 2 bits: Token position currently being read from by `poll`
590/// 2 bits: Token position last written to by `update`
591/// 1 bit: Queued flag, set when node is being pushed into MPSC queue.
592/// 1 bit: Dropped flag, set when all `Registration` handles have been dropped.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594struct ReadinessState(usize);
595
596/// Returned by `dequeue_node`. Represents the different states as described by
597/// the queue documentation on 1024cores.net.
598enum Dequeue {
599 Data(*mut ReadinessNode),
600 Empty,
601 Inconsistent,
602}
603
604const AWAKEN: Token = Token(usize::MAX);
605const MAX_REFCOUNT: usize = (isize::MAX) as usize;
606
607/*
608 *
609 * ===== Poll =====
610 *
611 */
612
613impl Poll {
614 /// Return a new `Poll` handle.
615 ///
616 /// This function will make a syscall to the operating system to create the
617 /// system selector. If this syscall fails, `Poll::new` will return with the
618 /// error.
619 ///
620 /// See [struct] level docs for more details.
621 ///
622 /// [struct]: struct.Poll.html
623 ///
624 /// # Examples
625 ///
626 /// ```
627 /// # use std::error::Error;
628 /// # fn try_main() -> Result<(), Box<Error>> {
629 /// use retty_io::{Poll, Events};
630 /// use std::time::Duration;
631 ///
632 /// let poll = match Poll::new() {
633 /// Ok(poll) => poll,
634 /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
635 /// };
636 ///
637 /// // Create a structure to receive polled events
638 /// let mut events = Events::with_capacity(1024);
639 ///
640 /// // Wait for events, but none will be received because no `Evented`
641 /// // handles have been registered with this `Poll` instance.
642 /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
643 /// assert_eq!(n, 0);
644 /// # Ok(())
645 /// # }
646 /// #
647 /// # fn main() {
648 /// # try_main().unwrap();
649 /// # }
650 /// ```
651 pub fn new() -> io::Result<Poll> {
652 is_send::<Poll>();
653 is_sync::<Poll>();
654
655 let poll = Poll {
656 selector: sys::Selector::new()?,
657 readiness_queue: ReadinessQueue::new()?,
658 lock_state: AtomicUsize::new(0),
659 lock: Mutex::new(()),
660 condvar: Condvar::new(),
661 };
662
663 // Register the notification wakeup FD with the IO poller
664 poll.readiness_queue.inner.awakener.register(
665 &poll,
666 AWAKEN,
667 Ready::readable(),
668 PollOpt::edge(),
669 )?;
670
671 Ok(poll)
672 }
673
674 /// Register an `Evented` handle with the `Poll` instance.
675 ///
676 /// Once registered, the `Poll` instance will monitor the `Evented` handle
677 /// for readiness state changes. When it notices a state change, it will
678 /// return a readiness event for the handle the next time [`poll`] is
679 /// called.
680 ///
681 /// See the [`struct`] docs for a high level overview.
682 ///
683 /// # Arguments
684 ///
685 /// `handle: &E: Evented`: This is the handle that the `Poll` instance
686 /// should monitor for readiness state changes.
687 ///
688 /// `token: Token`: The caller picks a token to associate with the socket.
689 /// When [`poll`] returns an event for the handle, this token is included.
690 /// This allows the caller to map the event to its handle. The token
691 /// associated with the `Evented` handle can be changed at any time by
692 /// calling [`reregister`].
693 ///
694 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
695 /// usage.
696 ///
697 /// See documentation on [`Token`] for an example showing how to pick
698 /// [`Token`] values.
699 ///
700 /// `interest: Ready`: Specifies which operations `Poll` should monitor for
701 /// readiness. `Poll` will only return readiness events for operations
702 /// specified by this argument.
703 ///
704 /// If a socket is registered with readable interest and the socket becomes
705 /// writable, no event will be returned from [`poll`].
706 ///
707 /// The readiness interest for an `Evented` handle can be changed at any
708 /// time by calling [`reregister`].
709 ///
710 /// `opts: PollOpt`: Specifies the registration options. The most common
711 /// options being [`level`] for level-triggered events, [`edge`] for
712 /// edge-triggered events, and [`oneshot`].
713 ///
714 /// The registration options for an `Evented` handle can be changed at any
715 /// time by calling [`reregister`].
716 ///
717 /// # Notes
718 ///
719 /// Unless otherwise specified, the caller should assume that once an
720 /// `Evented` handle is registered with a `Poll` instance, it is bound to
721 /// that `Poll` instance for the lifetime of the `Evented` handle. This
722 /// remains true even if the `Evented` handle is deregistered from the poll
723 /// instance using [`deregister`].
724 ///
725 /// This function is **thread safe**. It can be called concurrently from
726 /// multiple threads.
727 ///
728 /// [`struct`]: #
729 /// [`reregister`]: #method.reregister
730 /// [`deregister`]: #method.deregister
731 /// [`poll`]: #method.poll
732 /// [`level`]: struct.PollOpt.html#method.level
733 /// [`edge`]: struct.PollOpt.html#method.edge
734 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
735 /// [`Token`]: struct.Token.html
736 ///
737 /// # Examples
738 ///
739 /// ```
740 /// # use std::error::Error;
741 /// # fn try_main() -> Result<(), Box<Error>> {
742 /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
743 /// use retty_io::net::TcpStream;
744 /// use std::time::{Duration, Instant};
745 ///
746 /// let poll = Poll::new()?;
747 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
748 ///
749 /// // Register the socket with `poll`
750 /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
751 ///
752 /// let mut events = Events::with_capacity(1024);
753 /// let start = Instant::now();
754 /// let timeout = Duration::from_millis(500);
755 ///
756 /// loop {
757 /// let elapsed = start.elapsed();
758 ///
759 /// if elapsed >= timeout {
760 /// // Connection timed out
761 /// return Ok(());
762 /// }
763 ///
764 /// let remaining = timeout - elapsed;
765 /// poll.poll(&mut events, Some(remaining))?;
766 ///
767 /// for event in &events {
768 /// if event.token() == Token(0) {
769 /// // Something (probably) happened on the socket.
770 /// return Ok(());
771 /// }
772 /// }
773 /// }
774 /// # Ok(())
775 /// # }
776 /// #
777 /// # fn main() {
778 /// # try_main().unwrap();
779 /// # }
780 /// ```
781 pub fn register<E: ?Sized>(
782 &self,
783 handle: &E,
784 token: Token,
785 interest: Ready,
786 opts: PollOpt,
787 ) -> io::Result<()>
788 where
789 E: Evented,
790 {
791 validate_args(token)?;
792
793 /*
794 * Undefined behavior:
795 * - Reusing a token with a different `Evented` without deregistering
796 * (or closing) the original `Evented`.
797 */
798 trace!("registering with poller");
799
800 // Register interests for this socket
801 handle.register(self, token, interest, opts)?;
802
803 Ok(())
804 }
805
806 /// Re-register an `Evented` handle with the `Poll` instance.
807 ///
808 /// Re-registering an `Evented` handle allows changing the details of the
809 /// registration. Specifically, it allows updating the associated `token`,
810 /// `interest`, and `opts` specified in previous `register` and `reregister`
811 /// calls.
812 ///
813 /// The `reregister` arguments fully override the previous values. In other
814 /// words, if a socket is registered with [`readable`] interest and the call
815 /// to `reregister` specifies [`writable`], then read interest is no longer
816 /// requested for the handle.
817 ///
818 /// The `Evented` handle must have previously been registered with this
819 /// instance of `Poll` otherwise the call to `reregister` will return with
820 /// an error.
821 ///
822 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
823 /// usage.
824 ///
825 /// See the [`register`] documentation for details about the function
826 /// arguments and see the [`struct`] docs for a high level overview of
827 /// polling.
828 ///
829 /// # Examples
830 ///
831 /// ```
832 /// # use std::error::Error;
833 /// # fn try_main() -> Result<(), Box<Error>> {
834 /// use retty_io::{Poll, Ready, PollOpt, Token};
835 /// use retty_io::net::TcpStream;
836 ///
837 /// let poll = Poll::new()?;
838 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
839 ///
840 /// // Register the socket with `poll`, requesting readable
841 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
842 ///
843 /// // Reregister the socket specifying a different token and write interest
844 /// // instead. `PollOpt::edge()` must be specified even though that value
845 /// // is not being changed.
846 /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
847 /// # Ok(())
848 /// # }
849 /// #
850 /// # fn main() {
851 /// # try_main().unwrap();
852 /// # }
853 /// ```
854 ///
855 /// [`struct`]: #
856 /// [`register`]: #method.register
857 /// [`readable`]: struct.Ready.html#method.readable
858 /// [`writable`]: struct.Ready.html#method.writable
859 pub fn reregister<E: ?Sized>(
860 &self,
861 handle: &E,
862 token: Token,
863 interest: Ready,
864 opts: PollOpt,
865 ) -> io::Result<()>
866 where
867 E: Evented,
868 {
869 validate_args(token)?;
870
871 trace!("registering with poller");
872
873 // Register interests for this socket
874 handle.reregister(self, token, interest, opts)?;
875
876 Ok(())
877 }
878
879 /// Deregister an `Evented` handle with the `Poll` instance.
880 ///
881 /// When an `Evented` handle is deregistered, the `Poll` instance will
882 /// no longer monitor it for readiness state changes. Unlike disabling
883 /// handles with oneshot, deregistering clears up any internal resources
884 /// needed to track the handle.
885 ///
886 /// A handle can be passed back to `register` after it has been
887 /// deregistered; however, it must be passed back to the **same** `Poll`
888 /// instance.
889 ///
890 /// `Evented` handles are automatically deregistered when they are dropped.
891 /// It is common to never need to explicitly call `deregister`.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// # use std::error::Error;
897 /// # fn try_main() -> Result<(), Box<Error>> {
898 /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
899 /// use retty_io::net::TcpStream;
900 /// use std::time::Duration;
901 ///
902 /// let poll = Poll::new()?;
903 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
904 ///
905 /// // Register the socket with `poll`
906 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
907 ///
908 /// poll.deregister(&socket)?;
909 ///
910 /// let mut events = Events::with_capacity(1024);
911 ///
912 /// // Set a timeout because this poll should never receive any events.
913 /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
914 /// assert_eq!(0, n);
915 /// # Ok(())
916 /// # }
917 /// #
918 /// # fn main() {
919 /// # try_main().unwrap();
920 /// # }
921 /// ```
922 pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
923 where
924 E: Evented,
925 {
926 trace!("deregistering handle with poller");
927
928 // Deregister interests for this socket
929 handle.deregister(self)?;
930
931 Ok(())
932 }
933
934 /// Wait for readiness events
935 ///
936 /// Blocks the current thread and waits for readiness events for any of the
937 /// `Evented` handles that have been registered with this `Poll` instance.
938 /// The function will block until either at least one readiness event has
939 /// been received or `timeout` has elapsed. A `timeout` of `None` means that
940 /// `poll` will block until a readiness event has been received.
941 ///
942 /// The supplied `events` will be cleared and newly received readiness events
943 /// will be pushed onto the end. At most `events.capacity()` events will be
944 /// returned. If there are further pending readiness events, they will be
945 /// returned on the next call to `poll`.
946 ///
947 /// A single call to `poll` may result in multiple readiness events being
948 /// returned for a single `Evented` handle. For example, if a TCP socket
949 /// becomes both readable and writable, it may be possible for a single
950 /// readiness event to be returned with both [`readable`] and [`writable`]
951 /// readiness **OR** two separate events may be returned, one with
952 /// [`readable`] set and one with [`writable`] set.
953 ///
954 /// Note that the `timeout` will be rounded up to the system clock
955 /// granularity (usually 1ms), and kernel scheduling delays mean that
956 /// the blocking interval may be overrun by a small amount.
957 ///
958 /// `poll` returns the number of readiness events that have been pushed into
959 /// `events` or `Err` when an error has been encountered with the system
960 /// selector. The value returned is deprecated and will be removed in 0.7.0.
961 /// Accessing the events by index is also deprecated. Events can be
962 /// inserted by other events triggering, thus making sequential access
963 /// problematic. Use the iterator API instead. See [`iter`].
964 ///
965 /// See the [struct] level documentation for a higher level discussion of
966 /// polling.
967 ///
968 /// [`readable`]: struct.Ready.html#method.readable
969 /// [`writable`]: struct.Ready.html#method.writable
970 /// [struct]: #
971 /// [`iter`]: struct.Events.html#method.iter
972 ///
973 /// # Examples
974 ///
975 /// A basic example -- establishing a `TcpStream` connection.
976 ///
977 /// ```
978 /// # use std::error::Error;
979 /// # fn try_main() -> Result<(), Box<Error>> {
980 /// use retty_io::{Events, Poll, Ready, PollOpt, Token};
981 /// use retty_io::net::TcpStream;
982 ///
983 /// use std::net::{TcpListener, SocketAddr};
984 /// use std::thread;
985 ///
986 /// // Bind a server socket to connect to.
987 /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
988 /// let server = TcpListener::bind(&addr)?;
989 /// let addr = server.local_addr()?.clone();
990 ///
991 /// // Spawn a thread to accept the socket
992 /// thread::spawn(move || {
993 /// let _ = server.accept();
994 /// });
995 ///
996 /// // Construct a new `Poll` handle as well as the `Events` we'll store into
997 /// let poll = Poll::new()?;
998 /// let mut events = Events::with_capacity(1024);
999 ///
1000 /// // Connect the stream
1001 /// let stream = TcpStream::connect(&addr)?;
1002 ///
1003 /// // Register the stream with `Poll`
1004 /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1005 ///
1006 /// // Wait for the socket to become ready. This has to happens in a loop to
1007 /// // handle spurious wakeups.
1008 /// loop {
1009 /// poll.poll(&mut events, None)?;
1010 ///
1011 /// for event in &events {
1012 /// if event.token() == Token(0) && event.readiness().is_writable() {
1013 /// // The socket connected (probably, it could still be a spurious
1014 /// // wakeup)
1015 /// return Ok(());
1016 /// }
1017 /// }
1018 /// }
1019 /// # Ok(())
1020 /// # }
1021 /// #
1022 /// # fn main() {
1023 /// # try_main().unwrap();
1024 /// # }
1025 /// ```
1026 ///
1027 /// [struct]: #
1028 pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1029 self.poll1(events, timeout, false)
1030 }
1031
1032 /// Like `poll`, but may be interrupted by a signal
1033 ///
1034 /// If `poll` is inturrupted while blocking, it will transparently retry the syscall. If you
1035 /// want to handle signals yourself, however, use `poll_interruptible`.
1036 pub fn poll_interruptible(
1037 &self,
1038 events: &mut Events,
1039 timeout: Option<Duration>,
1040 ) -> io::Result<usize> {
1041 self.poll1(events, timeout, true)
1042 }
1043
1044 fn poll1(
1045 &self,
1046 events: &mut Events,
1047 mut timeout: Option<Duration>,
1048 interruptible: bool,
1049 ) -> io::Result<usize> {
1050 let zero = Some(Duration::from_millis(0));
1051
1052 // At a high level, the synchronization strategy is to acquire access to
1053 // the critical section by transitioning the atomic from unlocked ->
1054 // locked. If the attempt fails, the thread will wait on the condition
1055 // variable.
1056 //
1057 // # Some more detail
1058 //
1059 // The `lock_state` atomic usize combines:
1060 //
1061 // - locked flag, stored in the least significant bit
1062 // - number of waiting threads, stored in the rest of the bits.
1063 //
1064 // When a thread transitions the locked flag from 0 -> 1, it has
1065 // obtained access to the critical section.
1066 //
1067 // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1068 // This is a fast path for the case when there are no concurrent calls
1069 // to poll, which is very common.
1070 //
1071 // On failure, the mutex is locked, and the thread attempts to increment
1072 // the number of waiting threads component of `lock_state`. If this is
1073 // successfully done while the locked flag is set, then the thread can
1074 // wait on the condition variable.
1075 //
1076 // When a thread exits the critical section, it unsets the locked flag.
1077 // If there are any waiters, which is atomically determined while
1078 // unsetting the locked flag, then the condvar is notified.
1079
1080 let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1081
1082 if 0 != curr {
1083 // Enter slower path
1084 let mut lock = self.lock.lock().unwrap();
1085 let mut inc = false;
1086
1087 loop {
1088 if curr & 1 == 0 {
1089 // The lock is currently free, attempt to grab it
1090 let mut next = curr | 1;
1091
1092 if inc {
1093 // The waiter count has previously been incremented, so
1094 // decrement it here
1095 next -= 2;
1096 }
1097
1098 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1099
1100 if actual != curr {
1101 curr = actual;
1102 continue;
1103 }
1104
1105 // Lock acquired, break from the loop
1106 break;
1107 }
1108
1109 if timeout == zero {
1110 if inc {
1111 self.lock_state.fetch_sub(2, SeqCst);
1112 }
1113
1114 return Ok(0);
1115 }
1116
1117 // The lock is currently held, so wait for it to become
1118 // free. If the waiter count hasn't been incremented yet, do
1119 // so now
1120 if !inc {
1121 let next = curr.checked_add(2).expect("overflow");
1122 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1123
1124 if actual != curr {
1125 curr = actual;
1126 continue;
1127 }
1128
1129 // Track that the waiter count has been incremented for
1130 // this thread and fall through to the condvar waiting
1131 inc = true;
1132 }
1133
1134 lock = match timeout {
1135 Some(to) => {
1136 let now = Instant::now();
1137
1138 // Wait to be notified
1139 let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1140
1141 // See how much time was elapsed in the wait
1142 let elapsed = now.elapsed();
1143
1144 // Update `timeout` to reflect how much time is left to
1145 // wait.
1146 if elapsed >= to {
1147 timeout = zero;
1148 } else {
1149 // Update the timeout
1150 timeout = Some(to - elapsed);
1151 }
1152
1153 l
1154 }
1155 None => self.condvar.wait(lock).unwrap(),
1156 };
1157
1158 // Reload the state
1159 curr = self.lock_state.load(SeqCst);
1160
1161 // Try to lock again...
1162 }
1163 }
1164
1165 let ret = self.poll2(events, timeout, interruptible);
1166
1167 // Release the lock
1168 if 1 != self.lock_state.fetch_and(!1, Release) {
1169 // Acquire the mutex
1170 let _lock = self.lock.lock().unwrap();
1171
1172 // There is at least one waiting thread, so notify one
1173 self.condvar.notify_one();
1174 }
1175
1176 ret
1177 }
1178
1179 #[inline]
1180 #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
1181 fn poll2(
1182 &self,
1183 events: &mut Events,
1184 mut timeout: Option<Duration>,
1185 interruptible: bool,
1186 ) -> io::Result<usize> {
1187 // Compute the timeout value passed to the system selector. If the
1188 // readiness queue has pending nodes, we still want to poll the system
1189 // selector for new events, but we don't want to block the thread to
1190 // wait for new events.
1191 if timeout == Some(Duration::from_millis(0)) {
1192 // If blocking is not requested, then there is no need to prepare
1193 // the queue for sleep
1194 //
1195 // The sleep_marker should be removed by readiness_queue.poll().
1196 } else if self.readiness_queue.prepare_for_sleep() {
1197 // The readiness queue is empty. The call to `prepare_for_sleep`
1198 // inserts `sleep_marker` into the queue. This signals to any
1199 // threads setting readiness that the `Poll::poll` is going to
1200 // sleep, so the awakener should be used.
1201 } else {
1202 // The readiness queue is not empty, so do not block the thread.
1203 timeout = Some(Duration::from_millis(0));
1204 }
1205
1206 loop {
1207 let now = Instant::now();
1208 // First get selector events
1209 let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1210 match res {
1211 Ok(true) => {
1212 // Some awakeners require reading from a FD.
1213 self.readiness_queue.inner.awakener.cleanup();
1214 break;
1215 }
1216 Ok(false) => break,
1217 Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
1218 // Interrupted by a signal; update timeout if necessary and retry
1219 if let Some(to) = timeout {
1220 let elapsed = now.elapsed();
1221 if elapsed >= to {
1222 break;
1223 } else {
1224 timeout = Some(to - elapsed);
1225 }
1226 }
1227 }
1228 Err(e) => return Err(e),
1229 }
1230 }
1231
1232 // Poll custom event queue
1233 self.readiness_queue.poll(&mut events.inner);
1234
1235 // Return number of polled events
1236 Ok(events.inner.len())
1237 }
1238}
1239
1240fn validate_args(token: Token) -> io::Result<()> {
1241 if token == AWAKEN {
1242 return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1243 }
1244
1245 Ok(())
1246}
1247
1248impl fmt::Debug for Poll {
1249 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1250 fmt.debug_struct("Poll").finish()
1251 }
1252}
1253
1254#[cfg(all(unix, not(target_os = "fuchsia")))]
1255impl AsRawFd for Poll {
1256 fn as_raw_fd(&self) -> RawFd {
1257 self.selector.as_raw_fd()
1258 }
1259}
1260
1261/// A collection of readiness events.
1262///
1263/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1264/// receive any new readiness events received since the last poll. Usually, a
1265/// single `Events` instance is created at the same time as a [`Poll`] and
1266/// reused on each call to [`Poll::poll`].
1267///
1268/// See [`Poll`] for more documentation on polling.
1269///
1270/// # Examples
1271///
1272/// ```
1273/// # use std::error::Error;
1274/// # fn try_main() -> Result<(), Box<Error>> {
1275/// use retty_io::{Events, Poll};
1276/// use std::time::Duration;
1277///
1278/// let mut events = Events::with_capacity(1024);
1279/// let poll = Poll::new()?;
1280///
1281/// assert_eq!(0, events.len());
1282///
1283/// // Register `Evented` handles with `poll`
1284///
1285/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1286///
1287/// for event in &events {
1288/// println!("event={:?}", event);
1289/// }
1290/// # Ok(())
1291/// # }
1292/// #
1293/// # fn main() {
1294/// # try_main().unwrap();
1295/// # }
1296/// ```
1297///
1298/// [`Poll::poll`]: struct.Poll.html#method.poll
1299/// [`Poll`]: struct.Poll.html
1300pub struct Events {
1301 inner: sys::Events,
1302}
1303
1304/// [`Events`] iterator.
1305///
1306/// This struct is created by the [`iter`] method on [`Events`].
1307///
1308/// # Examples
1309///
1310/// ```
1311/// # use std::error::Error;
1312/// # fn try_main() -> Result<(), Box<Error>> {
1313/// use retty_io::{Events, Poll};
1314/// use std::time::Duration;
1315///
1316/// let mut events = Events::with_capacity(1024);
1317/// let poll = Poll::new()?;
1318///
1319/// // Register handles with `poll`
1320///
1321/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1322///
1323/// for event in events.iter() {
1324/// println!("event={:?}", event);
1325/// }
1326/// # Ok(())
1327/// # }
1328/// #
1329/// # fn main() {
1330/// # try_main().unwrap();
1331/// # }
1332/// ```
1333///
1334/// [`Events`]: struct.Events.html
1335/// [`iter`]: struct.Events.html#method.iter
1336#[derive(Debug, Clone)]
1337pub struct Iter<'a> {
1338 inner: &'a Events,
1339 pos: usize,
1340}
1341
1342/// Owned [`Events`] iterator.
1343///
1344/// This struct is created by the `into_iter` method on [`Events`].
1345///
1346/// # Examples
1347///
1348/// ```
1349/// # use std::error::Error;
1350/// # fn try_main() -> Result<(), Box<Error>> {
1351/// use retty_io::{Events, Poll};
1352/// use std::time::Duration;
1353///
1354/// let mut events = Events::with_capacity(1024);
1355/// let poll = Poll::new()?;
1356///
1357/// // Register handles with `poll`
1358///
1359/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1360///
1361/// for event in events {
1362/// println!("event={:?}", event);
1363/// }
1364/// # Ok(())
1365/// # }
1366/// #
1367/// # fn main() {
1368/// # try_main().unwrap();
1369/// # }
1370/// ```
1371/// [`Events`]: struct.Events.html
1372#[derive(Debug)]
1373pub struct IntoIter {
1374 inner: Events,
1375 pos: usize,
1376}
1377
1378impl Events {
1379 /// Return a new `Events` capable of holding up to `capacity` events.
1380 ///
1381 /// # Examples
1382 ///
1383 /// ```
1384 /// use retty_io::Events;
1385 ///
1386 /// let events = Events::with_capacity(1024);
1387 ///
1388 /// assert_eq!(1024, events.capacity());
1389 /// ```
1390 pub fn with_capacity(capacity: usize) -> Events {
1391 Events {
1392 inner: sys::Events::with_capacity(capacity),
1393 }
1394 }
1395
1396 #[deprecated(
1397 since = "0.6.10",
1398 note = "Index access removed in favor of iterator only API."
1399 )]
1400 #[doc(hidden)]
1401 pub fn get(&self, idx: usize) -> Option<Event> {
1402 self.inner.get(idx)
1403 }
1404
1405 #[doc(hidden)]
1406 #[deprecated(
1407 since = "0.6.10",
1408 note = "Index access removed in favor of iterator only API."
1409 )]
1410 pub fn len(&self) -> usize {
1411 self.inner.len()
1412 }
1413
1414 /// Returns the number of `Event` values that `self` can hold.
1415 ///
1416 /// ```
1417 /// use retty_io::Events;
1418 ///
1419 /// let events = Events::with_capacity(1024);
1420 ///
1421 /// assert_eq!(1024, events.capacity());
1422 /// ```
1423 pub fn capacity(&self) -> usize {
1424 self.inner.capacity()
1425 }
1426
1427 /// Returns `true` if `self` contains no `Event` values.
1428 ///
1429 /// # Examples
1430 ///
1431 /// ```
1432 /// use retty_io::Events;
1433 ///
1434 /// let events = Events::with_capacity(1024);
1435 ///
1436 /// assert!(events.is_empty());
1437 /// ```
1438 pub fn is_empty(&self) -> bool {
1439 self.inner.is_empty()
1440 }
1441
1442 /// Returns an iterator over the `Event` values.
1443 ///
1444 /// # Examples
1445 ///
1446 /// ```
1447 /// # use std::error::Error;
1448 /// # fn try_main() -> Result<(), Box<Error>> {
1449 /// use retty_io::{Events, Poll};
1450 /// use std::time::Duration;
1451 ///
1452 /// let mut events = Events::with_capacity(1024);
1453 /// let poll = Poll::new()?;
1454 ///
1455 /// // Register handles with `poll`
1456 ///
1457 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1458 ///
1459 /// for event in events.iter() {
1460 /// println!("event={:?}", event);
1461 /// }
1462 /// # Ok(())
1463 /// # }
1464 /// #
1465 /// # fn main() {
1466 /// # try_main().unwrap();
1467 /// # }
1468 /// ```
1469 pub fn iter(&self) -> Iter {
1470 Iter {
1471 inner: self,
1472 pos: 0,
1473 }
1474 }
1475
1476 /// Clearing all `Event` values from container explicitly.
1477 ///
1478 /// # Examples
1479 ///
1480 /// ```
1481 /// # use std::error::Error;
1482 /// # fn try_main() -> Result<(), Box<Error>> {
1483 /// use retty_io::{Events, Poll};
1484 /// use std::time::Duration;
1485 ///
1486 /// let mut events = Events::with_capacity(1024);
1487 /// let poll = Poll::new()?;
1488 ///
1489 /// // Register handles with `poll`
1490 /// for _ in 0..2 {
1491 /// events.clear();
1492 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1493 ///
1494 /// for event in events.iter() {
1495 /// println!("event={:?}", event);
1496 /// }
1497 /// }
1498 /// # Ok(())
1499 /// # }
1500 /// #
1501 /// # fn main() {
1502 /// # try_main().unwrap();
1503 /// # }
1504 /// ```
1505 pub fn clear(&mut self) {
1506 self.inner.clear();
1507 }
1508}
1509
1510impl<'a> IntoIterator for &'a Events {
1511 type Item = Event;
1512 type IntoIter = Iter<'a>;
1513
1514 fn into_iter(self) -> Self::IntoIter {
1515 self.iter()
1516 }
1517}
1518
1519impl<'a> Iterator for Iter<'a> {
1520 type Item = Event;
1521
1522 fn next(&mut self) -> Option<Event> {
1523 let ret = self.inner.inner.get(self.pos);
1524 self.pos += 1;
1525 ret
1526 }
1527}
1528
1529impl IntoIterator for Events {
1530 type Item = Event;
1531 type IntoIter = IntoIter;
1532
1533 fn into_iter(self) -> Self::IntoIter {
1534 IntoIter {
1535 inner: self,
1536 pos: 0,
1537 }
1538 }
1539}
1540
1541impl Iterator for IntoIter {
1542 type Item = Event;
1543
1544 fn next(&mut self) -> Option<Event> {
1545 let ret = self.inner.inner.get(self.pos);
1546 self.pos += 1;
1547 ret
1548 }
1549}
1550
1551impl fmt::Debug for Events {
1552 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1553 f.debug_struct("Events")
1554 .field("capacity", &self.capacity())
1555 .finish()
1556 }
1557}
1558
1559// ===== Accessors for internal usage =====
1560
1561pub fn selector(poll: &Poll) -> &sys::Selector {
1562 &poll.selector
1563}
1564
1565/*
1566 *
1567 * ===== Registration =====
1568 *
1569 */
1570
1571// TODO: get rid of this, windows depends on it for now
1572#[allow(dead_code)]
1573pub fn new_registration(
1574 poll: &Poll,
1575 token: Token,
1576 ready: Ready,
1577 opt: PollOpt,
1578) -> (Registration, SetReadiness) {
1579 Registration::new_priv(poll, token, ready, opt)
1580}
1581
1582impl Registration {
1583 /// Create and return a new `Registration` and the associated
1584 /// `SetReadiness`.
1585 ///
1586 /// See [struct] documentation for more detail and [`Poll`]
1587 /// for high level documentation on polling.
1588 ///
1589 /// # Examples
1590 ///
1591 /// ```
1592 /// # use std::error::Error;
1593 /// # fn try_main() -> Result<(), Box<Error>> {
1594 /// use retty_io::{Events, Ready, Registration, Poll, PollOpt, Token};
1595 /// use std::thread;
1596 ///
1597 /// let (registration, set_readiness) = Registration::new2();
1598 ///
1599 /// thread::spawn(move || {
1600 /// use std::time::Duration;
1601 /// thread::sleep(Duration::from_millis(500));
1602 ///
1603 /// set_readiness.set_readiness(Ready::readable());
1604 /// });
1605 ///
1606 /// let poll = Poll::new()?;
1607 /// poll.register(®istration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1608 ///
1609 /// let mut events = Events::with_capacity(256);
1610 ///
1611 /// loop {
1612 /// poll.poll(&mut events, None);
1613 ///
1614 /// for event in &events {
1615 /// if event.token() == Token(0) && event.readiness().is_readable() {
1616 /// return Ok(());
1617 /// }
1618 /// }
1619 /// }
1620 /// # Ok(())
1621 /// # }
1622 /// #
1623 /// # fn main() {
1624 /// # try_main().unwrap();
1625 /// # }
1626 /// ```
1627 /// [struct]: #
1628 /// [`Poll`]: struct.Poll.html
1629 pub fn new2() -> (Registration, SetReadiness) {
1630 // Allocate the registration node. The new node will have `ref_count`
1631 // set to 2: one SetReadiness, one Registration.
1632 let node = Box::into_raw(Box::new(ReadinessNode::new(
1633 ptr::null_mut(),
1634 Token(0),
1635 Ready::empty(),
1636 PollOpt::empty(),
1637 2,
1638 )));
1639
1640 let registration = Registration {
1641 inner: RegistrationInner { node },
1642 };
1643
1644 let set_readiness = SetReadiness {
1645 inner: RegistrationInner { node },
1646 };
1647
1648 (registration, set_readiness)
1649 }
1650
1651 #[deprecated(since = "0.6.5", note = "use `new2` instead")]
1652 #[cfg(feature = "with-deprecated")]
1653 #[doc(hidden)]
1654 pub fn new(
1655 poll: &Poll,
1656 token: Token,
1657 interest: Ready,
1658 opt: PollOpt,
1659 ) -> (Registration, SetReadiness) {
1660 Registration::new_priv(poll, token, interest, opt)
1661 }
1662
1663 // TODO: Get rid of this (windows depends on it for now)
1664 fn new_priv(
1665 poll: &Poll,
1666 token: Token,
1667 interest: Ready,
1668 opt: PollOpt,
1669 ) -> (Registration, SetReadiness) {
1670 is_send::<Registration>();
1671 is_sync::<Registration>();
1672 is_send::<SetReadiness>();
1673 is_sync::<SetReadiness>();
1674
1675 // Clone handle to the readiness queue, this bumps the ref count
1676 let queue = poll.readiness_queue.inner.clone();
1677
1678 // Convert to a *mut () pointer
1679 let queue: *mut () = unsafe { mem::transmute(queue) };
1680
1681 // Allocate the registration node. The new node will have `ref_count`
1682 // set to 3: one SetReadiness, one Registration, and one Poll handle.
1683 let node = Box::into_raw(Box::new(ReadinessNode::new(queue, token, interest, opt, 3)));
1684
1685 let registration = Registration {
1686 inner: RegistrationInner { node },
1687 };
1688
1689 let set_readiness = SetReadiness {
1690 inner: RegistrationInner { node },
1691 };
1692
1693 (registration, set_readiness)
1694 }
1695
1696 #[deprecated(since = "0.6.5", note = "use `Evented` impl")]
1697 #[cfg(feature = "with-deprecated")]
1698 #[doc(hidden)]
1699 pub fn update(
1700 &self,
1701 poll: &Poll,
1702 token: Token,
1703 interest: Ready,
1704 opts: PollOpt,
1705 ) -> io::Result<()> {
1706 self.inner.update(poll, token, interest, opts)
1707 }
1708
1709 #[deprecated(since = "0.6.5", note = "use `Poll::deregister` instead")]
1710 #[cfg(feature = "with-deprecated")]
1711 #[doc(hidden)]
1712 pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1713 self.inner
1714 .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1715 }
1716}
1717
1718impl Evented for Registration {
1719 fn register(
1720 &self,
1721 poll: &Poll,
1722 token: Token,
1723 interest: Ready,
1724 opts: PollOpt,
1725 ) -> io::Result<()> {
1726 self.inner.update(poll, token, interest, opts)
1727 }
1728
1729 fn reregister(
1730 &self,
1731 poll: &Poll,
1732 token: Token,
1733 interest: Ready,
1734 opts: PollOpt,
1735 ) -> io::Result<()> {
1736 self.inner.update(poll, token, interest, opts)
1737 }
1738
1739 fn deregister(&self, poll: &Poll) -> io::Result<()> {
1740 self.inner
1741 .update(poll, Token(0), Ready::empty(), PollOpt::empty())
1742 }
1743}
1744
1745impl Drop for Registration {
1746 fn drop(&mut self) {
1747 // `flag_as_dropped` toggles the `dropped` flag and notifies
1748 // `Poll::poll` to release its handle (which is just decrementing
1749 // the ref count).
1750 if self.inner.state.flag_as_dropped() {
1751 // Can't do anything if the queuing fails
1752 let _ = self.inner.enqueue_with_wakeup();
1753 }
1754 }
1755}
1756
1757impl fmt::Debug for Registration {
1758 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1759 fmt.debug_struct("Registration").finish()
1760 }
1761}
1762
1763impl SetReadiness {
1764 /// Returns the registration's current readiness.
1765 ///
1766 /// # Note
1767 ///
1768 /// There is no guarantee that `readiness` establishes any sort of memory
1769 /// ordering. Any concurrent data access must be synchronized using another
1770 /// strategy.
1771 ///
1772 /// # Examples
1773 ///
1774 /// ```
1775 /// # use std::error::Error;
1776 /// # fn try_main() -> Result<(), Box<Error>> {
1777 /// use retty_io::{Registration, Ready};
1778 ///
1779 /// let (registration, set_readiness) = Registration::new2();
1780 ///
1781 /// assert!(set_readiness.readiness().is_empty());
1782 ///
1783 /// set_readiness.set_readiness(Ready::readable())?;
1784 /// assert!(set_readiness.readiness().is_readable());
1785 /// # Ok(())
1786 /// # }
1787 /// #
1788 /// # fn main() {
1789 /// # try_main().unwrap();
1790 /// # }
1791 /// ```
1792 pub fn readiness(&self) -> Ready {
1793 self.inner.readiness()
1794 }
1795
1796 /// Set the registration's readiness
1797 ///
1798 /// If the associated `Registration` is registered with a [`Poll`] instance
1799 /// and has requested readiness events that include `ready`, then a future
1800 /// call to [`Poll::poll`] will receive a readiness event representing the
1801 /// readiness state change.
1802 ///
1803 /// # Note
1804 ///
1805 /// There is no guarantee that `readiness` establishes any sort of memory
1806 /// ordering. Any concurrent data access must be synchronized using another
1807 /// strategy.
1808 ///
1809 /// There is also no guarantee as to when the readiness event will be
1810 /// delivered to poll. A best attempt will be made to make the delivery in a
1811 /// "timely" fashion. For example, the following is **not** guaranteed to
1812 /// work:
1813 ///
1814 /// ```
1815 /// # use std::error::Error;
1816 /// # fn try_main() -> Result<(), Box<Error>> {
1817 /// use retty_io::{Events, Registration, Ready, Poll, PollOpt, Token};
1818 ///
1819 /// let poll = Poll::new()?;
1820 /// let (registration, set_readiness) = Registration::new2();
1821 ///
1822 /// poll.register(®istration,
1823 /// Token(0),
1824 /// Ready::readable(),
1825 /// PollOpt::edge())?;
1826 ///
1827 /// // Set the readiness, then immediately poll to try to get the readiness
1828 /// // event
1829 /// set_readiness.set_readiness(Ready::readable())?;
1830 ///
1831 /// let mut events = Events::with_capacity(1024);
1832 /// poll.poll(&mut events, None)?;
1833 ///
1834 /// // There is NO guarantee that the following will work. It is possible
1835 /// // that the readiness event will be delivered at a later time.
1836 /// let event = events.get(0).unwrap();
1837 /// assert_eq!(event.token(), Token(0));
1838 /// assert!(event.readiness().is_readable());
1839 /// # Ok(())
1840 /// # }
1841 /// #
1842 /// # fn main() {
1843 /// # try_main().unwrap();
1844 /// # }
1845 /// ```
1846 ///
1847 /// # Examples
1848 ///
1849 /// A simple example, for a more elaborate example, see the [`Evented`]
1850 /// documentation.
1851 ///
1852 /// ```
1853 /// # use std::error::Error;
1854 /// # fn try_main() -> Result<(), Box<Error>> {
1855 /// use retty_io::{Registration, Ready};
1856 ///
1857 /// let (registration, set_readiness) = Registration::new2();
1858 ///
1859 /// assert!(set_readiness.readiness().is_empty());
1860 ///
1861 /// set_readiness.set_readiness(Ready::readable())?;
1862 /// assert!(set_readiness.readiness().is_readable());
1863 /// # Ok(())
1864 /// # }
1865 /// #
1866 /// # fn main() {
1867 /// # try_main().unwrap();
1868 /// # }
1869 /// ```
1870 ///
1871 /// [`Registration`]: struct.Registration.html
1872 /// [`Evented`]: event/trait.Evented.html#examples
1873 /// [`Poll`]: struct.Poll.html
1874 /// [`Poll::poll`]: struct.Poll.html#method.poll
1875 pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1876 self.inner.set_readiness(ready)
1877 }
1878}
1879
1880impl fmt::Debug for SetReadiness {
1881 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1882 f.debug_struct("SetReadiness").finish()
1883 }
1884}
1885
1886impl RegistrationInner {
1887 /// Get the registration's readiness.
1888 fn readiness(&self) -> Ready {
1889 self.state.load(Relaxed).readiness()
1890 }
1891
1892 /// Set the registration's readiness.
1893 ///
1894 /// This function can be called concurrently by an arbitrary number of
1895 /// SetReadiness handles.
1896 fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1897 // Load the current atomic state.
1898 let mut state = self.state.load(Acquire);
1899 let mut next;
1900
1901 loop {
1902 next = state;
1903
1904 if state.is_dropped() {
1905 // Node is dropped, no more notifications
1906 return Ok(());
1907 }
1908
1909 // Update the readiness
1910 next.set_readiness(ready);
1911
1912 // If the readiness is not blank, try to obtain permission to
1913 // push the node into the readiness queue.
1914 if !next.effective_readiness().is_empty() {
1915 next.set_queued();
1916 }
1917
1918 let actual = self.state.compare_and_swap(state, next, AcqRel);
1919
1920 if state == actual {
1921 break;
1922 }
1923
1924 state = actual;
1925 }
1926
1927 if !state.is_queued() && next.is_queued() {
1928 // We toggled the queued flag, making us responsible for queuing the
1929 // node in the MPSC readiness queue.
1930 self.enqueue_with_wakeup()?;
1931 }
1932
1933 Ok(())
1934 }
1935
1936 /// Update the registration details associated with the node
1937 fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
1938 // First, ensure poll instances match
1939 //
1940 // Load the queue pointer, `Relaxed` is sufficient here as only the
1941 // pointer is being operated on. The actual memory is guaranteed to be
1942 // visible the `poll: &Poll` ref passed as an argument to the function.
1943 let mut queue = self.readiness_queue.load(Relaxed);
1944 let other: &*mut () =
1945 unsafe { &*(&poll.readiness_queue.inner as *const _ as *const *mut ()) };
1946 let other = *other;
1947
1948 debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
1949
1950 if queue.is_null() {
1951 // Attempt to set the queue pointer. `Release` ordering synchronizes
1952 // with `Acquire` in `ensure_with_wakeup`.
1953 let actual = self.readiness_queue.compare_and_swap(queue, other, Release);
1954
1955 if actual.is_null() {
1956 // The CAS succeeded, this means that the node's ref count
1957 // should be incremented to reflect that the `poll` function
1958 // effectively owns the node as well.
1959 //
1960 // `Relaxed` ordering used for the same reason as in
1961 // RegistrationInner::clone
1962 self.ref_count.fetch_add(1, Relaxed);
1963
1964 // Note that the `queue` reference stored in our
1965 // `readiness_queue` field is intended to be a strong reference,
1966 // so now that we've successfully claimed the reference we bump
1967 // the refcount here.
1968 //
1969 // Down below in `release_node` when we deallocate this
1970 // `RegistrationInner` is where we'll transmute this back to an
1971 // arc and decrement the reference count.
1972 mem::forget(poll.readiness_queue.clone());
1973 } else {
1974 // The CAS failed, another thread set the queue pointer, so ensure
1975 // that the pointer and `other` match
1976 if actual != other {
1977 return Err(io::Error::new(
1978 io::ErrorKind::Other,
1979 "registration handle associated with another `Poll` instance",
1980 ));
1981 }
1982 }
1983
1984 queue = other;
1985 } else if queue != other {
1986 return Err(io::Error::new(
1987 io::ErrorKind::Other,
1988 "registration handle associated with another `Poll` instance",
1989 ));
1990 }
1991
1992 unsafe {
1993 let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1994 debug_assert_eq!(queue as usize, *actual);
1995 }
1996
1997 // The `update_lock` atomic is used as a flag ensuring only a single
1998 // thread concurrently enters the `update` critical section. Any
1999 // concurrent calls to update are discarded. If coordinated updates are
2000 // required, the retty-io user is responsible for handling that.
2001 //
2002 // Acquire / Release ordering is used on `update_lock` to ensure that
2003 // data access to the `token_*` variables are scoped to the critical
2004 // section.
2005
2006 // Acquire the update lock.
2007 if self.update_lock.compare_and_swap(false, true, Acquire) {
2008 // The lock is already held. Discard the update
2009 return Ok(());
2010 }
2011
2012 // Relaxed ordering is acceptable here as the only memory that needs to
2013 // be visible as part of the update are the `token_*` variables, and
2014 // ordering has already been handled by the `update_lock` access.
2015 let mut state = self.state.load(Relaxed);
2016 let mut next;
2017
2018 // Read the current token, again this memory has been ordered by the
2019 // acquire on `update_lock`.
2020 let curr_token_pos = state.token_write_pos();
2021 let curr_token = unsafe { self::token(self, curr_token_pos) };
2022
2023 let mut next_token_pos = curr_token_pos;
2024
2025 // If the `update` call is changing the token, then compute the next
2026 // available token slot and write the token there.
2027 //
2028 // Note that this computation is happening *outside* of the
2029 // compare-and-swap loop. The update lock ensures that only a single
2030 // thread could be mutating the write_token_position, so the
2031 // `next_token_pos` will never need to be recomputed even if
2032 // `token_read_pos` concurrently changes. This is because
2033 // `token_read_pos` can ONLY concurrently change to the current value of
2034 // `token_write_pos`, so `next_token_pos` will always remain valid.
2035 if token != curr_token {
2036 next_token_pos = state.next_token_pos();
2037
2038 // Update the token
2039 match next_token_pos {
2040 0 => unsafe { *self.token_0.get() = token },
2041 1 => unsafe { *self.token_1.get() = token },
2042 2 => unsafe { *self.token_2.get() = token },
2043 _ => unreachable!(),
2044 }
2045 }
2046
2047 // Now enter the compare-and-swap loop
2048 loop {
2049 next = state;
2050
2051 // The node is only dropped once all `Registration` handles are
2052 // dropped. Only `Registration` can call `update`.
2053 debug_assert!(!state.is_dropped());
2054
2055 // Update the write token position, this will also release the token
2056 // to Poll::poll.
2057 next.set_token_write_pos(next_token_pos);
2058
2059 // Update readiness and poll opts
2060 next.set_interest(interest);
2061 next.set_poll_opt(opt);
2062
2063 // If there is effective readiness, the node will need to be queued
2064 // for processing. This exact behavior is still TBD, so we are
2065 // conservative for now and always fire.
2066 //
2067 // See https://github.com/carllerche/retty-io/issues/535.
2068 if !next.effective_readiness().is_empty() {
2069 next.set_queued();
2070 }
2071
2072 // compare-and-swap the state values. Only `Release` is needed here.
2073 // The `Release` ensures that `Poll::poll` will see the token
2074 // update and the update function doesn't care about any other
2075 // memory visibility.
2076 let actual = self.state.compare_and_swap(state, next, Release);
2077
2078 if actual == state {
2079 break;
2080 }
2081
2082 // CAS failed, but `curr_token_pos` should not have changed given
2083 // that we still hold the update lock.
2084 debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2085
2086 state = actual;
2087 }
2088
2089 // Release the lock
2090 self.update_lock.store(false, Release);
2091
2092 if !state.is_queued() && next.is_queued() {
2093 // We are responsible for enqueing the node.
2094 enqueue_with_wakeup(queue, self)?;
2095 }
2096
2097 Ok(())
2098 }
2099}
2100
2101impl ops::Deref for RegistrationInner {
2102 type Target = ReadinessNode;
2103
2104 fn deref(&self) -> &ReadinessNode {
2105 unsafe { &*self.node }
2106 }
2107}
2108
2109impl Clone for RegistrationInner {
2110 fn clone(&self) -> RegistrationInner {
2111 // Using a relaxed ordering is alright here, as knowledge of the
2112 // original reference prevents other threads from erroneously deleting
2113 // the object.
2114 //
2115 // As explained in the [Boost documentation][1], Increasing the
2116 // reference counter can always be done with memory_order_relaxed: New
2117 // references to an object can only be formed from an existing
2118 // reference, and passing an existing reference from one thread to
2119 // another must already provide any required synchronization.
2120 //
2121 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2122 let old_size = self.ref_count.fetch_add(1, Relaxed);
2123
2124 // However we need to guard against massive refcounts in case someone
2125 // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2126 // and users will use-after free. We racily saturate to `isize::MAX` on
2127 // the assumption that there aren't ~2 billion threads incrementing
2128 // the reference count at once. This branch will never be taken in
2129 // any realistic program.
2130 //
2131 // We abort because such a program is incredibly degenerate, and we
2132 // don't care to support it.
2133 if old_size & !MAX_REFCOUNT != 0 {
2134 process::abort();
2135 }
2136
2137 RegistrationInner { node: self.node }
2138 }
2139}
2140
2141impl Drop for RegistrationInner {
2142 fn drop(&mut self) {
2143 // Only handles releasing from `Registration` and `SetReadiness`
2144 // handles. Poll has to call this itself.
2145 release_node(self.node);
2146 }
2147}
2148
2149/*
2150 *
2151 * ===== ReadinessQueue =====
2152 *
2153 */
2154
2155impl ReadinessQueue {
2156 /// Create a new `ReadinessQueue`.
2157 fn new() -> io::Result<ReadinessQueue> {
2158 is_send::<Self>();
2159 is_sync::<Self>();
2160
2161 let end_marker = Box::new(ReadinessNode::marker());
2162 let sleep_marker = Box::new(ReadinessNode::marker());
2163 let closed_marker = Box::new(ReadinessNode::marker());
2164
2165 let ptr = &*end_marker as *const _ as *mut _;
2166
2167 Ok(ReadinessQueue {
2168 inner: Arc::new(ReadinessQueueInner {
2169 awakener: sys::Awakener::new()?,
2170 head_readiness: AtomicPtr::new(ptr),
2171 tail_readiness: UnsafeCell::new(ptr),
2172 end_marker,
2173 sleep_marker,
2174 closed_marker,
2175 }),
2176 })
2177 }
2178
2179 /// Poll the queue for new events
2180 fn poll(&self, dst: &mut sys::Events) {
2181 // `until` is set with the first node that gets re-enqueued due to being
2182 // set to have level-triggered notifications. This prevents an infinite
2183 // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2184 let mut until = ptr::null_mut();
2185
2186 if dst.len() == dst.capacity() {
2187 // If `dst` is already full, the readiness queue won't be drained.
2188 // This might result in `sleep_marker` staying in the queue and
2189 // unecessary pipe writes occuring.
2190 self.inner.clear_sleep_marker();
2191 }
2192
2193 'outer: while dst.len() < dst.capacity() {
2194 // Dequeue a node. If the queue is in an inconsistent state, then
2195 // stop polling. `Poll::poll` will be called again shortly and enter
2196 // a syscall, which should be enough to enable the other thread to
2197 // finish the queuing process.
2198 let ptr = match unsafe { self.inner.dequeue_node(until) } {
2199 Dequeue::Empty | Dequeue::Inconsistent => break,
2200 Dequeue::Data(ptr) => ptr,
2201 };
2202
2203 let node = unsafe { &*ptr };
2204
2205 // Read the node state with Acquire ordering. This allows reading
2206 // the token variables.
2207 let mut state = node.state.load(Acquire);
2208 let mut next;
2209 let mut readiness;
2210 let mut opt;
2211
2212 loop {
2213 // Build up any changes to the readiness node's state and
2214 // attempt the CAS at the end
2215 next = state;
2216
2217 // Given that the node was just read from the queue, the
2218 // `queued` flag should still be set.
2219 debug_assert!(state.is_queued());
2220
2221 // The dropped flag means we need to release the node and
2222 // perform no further processing on it.
2223 if state.is_dropped() {
2224 // Release the node and continue
2225 release_node(ptr);
2226 continue 'outer;
2227 }
2228
2229 // Process the node
2230 readiness = state.effective_readiness();
2231 opt = state.poll_opt();
2232
2233 if opt.is_edge() {
2234 // Mark the node as dequeued
2235 next.set_dequeued();
2236
2237 if opt.is_oneshot() && !readiness.is_empty() {
2238 next.disarm();
2239 }
2240 } else if readiness.is_empty() {
2241 next.set_dequeued();
2242 }
2243
2244 // Ensure `token_read_pos` is set to `token_write_pos` so that
2245 // we read the most up to date token value.
2246 next.update_token_read_pos();
2247
2248 if state == next {
2249 break;
2250 }
2251
2252 let actual = node.state.compare_and_swap(state, next, AcqRel);
2253
2254 if actual == state {
2255 break;
2256 }
2257
2258 state = actual;
2259 }
2260
2261 // If the queued flag is still set, then the node must be requeued.
2262 // This typically happens when using level-triggered notifications.
2263 if next.is_queued() {
2264 if until.is_null() {
2265 // We never want to see the node again
2266 until = ptr;
2267 }
2268
2269 // Requeue the node
2270 self.inner.enqueue_node(node);
2271 }
2272
2273 if !readiness.is_empty() {
2274 // Get the token
2275 let token = unsafe { token(node, next.token_read_pos()) };
2276
2277 // Push the event
2278 dst.push_event(Event::new(readiness, token));
2279 }
2280 }
2281 }
2282
2283 /// Prepare the queue for the `Poll::poll` thread to block in the system
2284 /// selector. This involves changing `head_readiness` to `sleep_marker`.
2285 /// Returns true if successful and `poll` can block.
2286 fn prepare_for_sleep(&self) -> bool {
2287 let end_marker = self.inner.end_marker();
2288 let sleep_marker = self.inner.sleep_marker();
2289
2290 let tail = unsafe { *self.inner.tail_readiness.get() };
2291
2292 // If the tail is currently set to the sleep_marker, then check if the
2293 // head is as well. If it is, then the queue is currently ready to
2294 // sleep. If it is not, then the queue is not empty and there should be
2295 // no sleeping.
2296 if tail == sleep_marker {
2297 return self.inner.head_readiness.load(Acquire) == sleep_marker;
2298 }
2299
2300 // If the tail is not currently set to `end_marker`, then the queue is
2301 // not empty.
2302 if tail != end_marker {
2303 return false;
2304 }
2305
2306 // The sleep marker is *not* currently in the readiness queue.
2307 //
2308 // The sleep marker is only inserted in this function. It is also only
2309 // inserted in the tail position. This is guaranteed by first checking
2310 // that the end marker is in the tail position, pushing the sleep marker
2311 // after the end marker, then removing the end marker.
2312 //
2313 // Before inserting a node into the queue, the next pointer has to be
2314 // set to null. Again, this is only safe to do when the node is not
2315 // currently in the queue, but we already have ensured this.
2316 self.inner
2317 .sleep_marker
2318 .next_readiness
2319 .store(ptr::null_mut(), Relaxed);
2320
2321 let actual = self
2322 .inner
2323 .head_readiness
2324 .compare_and_swap(end_marker, sleep_marker, AcqRel);
2325
2326 debug_assert!(actual != sleep_marker);
2327
2328 if actual != end_marker {
2329 // The readiness queue is not empty
2330 return false;
2331 }
2332
2333 // The current tail should be pointing to `end_marker`
2334 debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2335 // The `end_marker` next pointer should be null
2336 debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2337
2338 // Update tail pointer.
2339 unsafe {
2340 *self.inner.tail_readiness.get() = sleep_marker;
2341 }
2342 true
2343 }
2344}
2345
2346impl Drop for ReadinessQueue {
2347 fn drop(&mut self) {
2348 // Close the queue by enqueuing the closed node
2349 self.inner.enqueue_node(&self.inner.closed_marker);
2350
2351 loop {
2352 // Free any nodes that happen to be left in the readiness queue
2353 let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2354 Dequeue::Empty => break,
2355 Dequeue::Inconsistent => {
2356 // This really shouldn't be possible as all other handles to
2357 // `ReadinessQueueInner` are dropped, but handle this by
2358 // spinning I guess?
2359 continue;
2360 }
2361 Dequeue::Data(ptr) => ptr,
2362 };
2363
2364 let node = unsafe { &*ptr };
2365
2366 let state = node.state.load(Acquire);
2367
2368 debug_assert!(state.is_queued());
2369
2370 release_node(ptr);
2371 }
2372 }
2373}
2374
2375impl ReadinessQueueInner {
2376 fn wakeup(&self) -> io::Result<()> {
2377 self.awakener.wakeup()
2378 }
2379
2380 /// Prepend the given node to the head of the readiness queue. This is done
2381 /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
2382 fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2383 if self.enqueue_node(node) {
2384 self.wakeup()?;
2385 }
2386
2387 Ok(())
2388 }
2389
2390 /// Push the node into the readiness queue
2391 fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2392 // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2393 let node_ptr = node as *const _ as *mut _;
2394
2395 // Relaxed used as the ordering is "released" when swapping
2396 // `head_readiness`
2397 node.next_readiness.store(ptr::null_mut(), Relaxed);
2398
2399 unsafe {
2400 let mut prev = self.head_readiness.load(Acquire);
2401
2402 loop {
2403 if prev == self.closed_marker() {
2404 debug_assert!(node_ptr != self.closed_marker());
2405 // debug_assert!(node_ptr != self.end_marker());
2406 debug_assert!(node_ptr != self.sleep_marker());
2407
2408 if node_ptr != self.end_marker() {
2409 // The readiness queue is shutdown, but the enqueue flag was
2410 // set. This means that we are responsible for decrementing
2411 // the ready queue's ref count
2412 debug_assert!(node.ref_count.load(Relaxed) >= 2);
2413 release_node(node_ptr);
2414 }
2415
2416 return false;
2417 }
2418
2419 let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2420
2421 if prev == act {
2422 break;
2423 }
2424
2425 prev = act;
2426 }
2427
2428 debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2429
2430 (*prev).next_readiness.store(node_ptr, Release);
2431
2432 prev == self.sleep_marker()
2433 }
2434 }
2435
2436 fn clear_sleep_marker(&self) {
2437 let end_marker = self.end_marker();
2438 let sleep_marker = self.sleep_marker();
2439
2440 unsafe {
2441 let tail = *self.tail_readiness.get();
2442
2443 if tail != self.sleep_marker() {
2444 return;
2445 }
2446
2447 // The empty markeer is *not* currently in the readiness queue
2448 // (since the sleep markeris).
2449 self.end_marker
2450 .next_readiness
2451 .store(ptr::null_mut(), Relaxed);
2452
2453 let actual = self
2454 .head_readiness
2455 .compare_and_swap(sleep_marker, end_marker, AcqRel);
2456
2457 debug_assert!(actual != end_marker);
2458
2459 if actual != sleep_marker {
2460 // The readiness queue is not empty, we cannot remove the sleep
2461 // markeer
2462 return;
2463 }
2464
2465 // Update the tail pointer.
2466 *self.tail_readiness.get() = end_marker;
2467 }
2468 }
2469
2470 /// Must only be called in `poll` or `drop`
2471 unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2472 // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2473 // with the modifications mentioned at the top of the file.
2474 let mut tail = *self.tail_readiness.get();
2475 let mut next = (*tail).next_readiness.load(Acquire);
2476
2477 if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker()
2478 {
2479 if next.is_null() {
2480 // Make sure the sleep marker is removed (as we are no longer
2481 // sleeping
2482 self.clear_sleep_marker();
2483
2484 return Dequeue::Empty;
2485 }
2486
2487 *self.tail_readiness.get() = next;
2488 tail = next;
2489 next = (*next).next_readiness.load(Acquire);
2490 }
2491
2492 // Only need to check `until` at this point. `until` is either null,
2493 // which will never match tail OR it is a node that was pushed by
2494 // the current thread. This means that either:
2495 //
2496 // 1) The queue is inconsistent, which is handled explicitly
2497 // 2) We encounter `until` at this point in dequeue
2498 // 3) we will pop a different node
2499 if tail == until {
2500 return Dequeue::Empty;
2501 }
2502
2503 if !next.is_null() {
2504 *self.tail_readiness.get() = next;
2505 return Dequeue::Data(tail);
2506 }
2507
2508 if self.head_readiness.load(Acquire) != tail {
2509 return Dequeue::Inconsistent;
2510 }
2511
2512 // Push the stub node
2513 self.enqueue_node(&self.end_marker);
2514
2515 next = (*tail).next_readiness.load(Acquire);
2516
2517 if !next.is_null() {
2518 *self.tail_readiness.get() = next;
2519 return Dequeue::Data(tail);
2520 }
2521
2522 Dequeue::Inconsistent
2523 }
2524
2525 fn end_marker(&self) -> *mut ReadinessNode {
2526 &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2527 }
2528
2529 fn sleep_marker(&self) -> *mut ReadinessNode {
2530 &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2531 }
2532
2533 fn closed_marker(&self) -> *mut ReadinessNode {
2534 &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2535 }
2536}
2537
2538impl ReadinessNode {
2539 /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
2540 fn new(
2541 queue: *mut (),
2542 token: Token,
2543 interest: Ready,
2544 opt: PollOpt,
2545 ref_count: usize,
2546 ) -> ReadinessNode {
2547 ReadinessNode {
2548 state: AtomicState::new(interest, opt),
2549 // Only the first token is set, the others are initialized to 0
2550 token_0: UnsafeCell::new(token),
2551 token_1: UnsafeCell::new(Token(0)),
2552 token_2: UnsafeCell::new(Token(0)),
2553 next_readiness: AtomicPtr::new(ptr::null_mut()),
2554 update_lock: AtomicBool::new(false),
2555 readiness_queue: AtomicPtr::new(queue),
2556 ref_count: AtomicUsize::new(ref_count),
2557 }
2558 }
2559
2560 fn marker() -> ReadinessNode {
2561 ReadinessNode {
2562 state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2563 token_0: UnsafeCell::new(Token(0)),
2564 token_1: UnsafeCell::new(Token(0)),
2565 token_2: UnsafeCell::new(Token(0)),
2566 next_readiness: AtomicPtr::new(ptr::null_mut()),
2567 update_lock: AtomicBool::new(false),
2568 readiness_queue: AtomicPtr::new(ptr::null_mut()),
2569 ref_count: AtomicUsize::new(0),
2570 }
2571 }
2572
2573 fn enqueue_with_wakeup(&self) -> io::Result<()> {
2574 let queue = self.readiness_queue.load(Acquire);
2575
2576 if queue.is_null() {
2577 // Not associated with a queue, nothing to do
2578 return Ok(());
2579 }
2580
2581 enqueue_with_wakeup(queue, self)
2582 }
2583}
2584
2585fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2586 debug_assert!(!queue.is_null());
2587 // This is ugly... but we don't want to bump the ref count.
2588 let queue: &Arc<ReadinessQueueInner> =
2589 unsafe { &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>) };
2590 queue.enqueue_node_with_wakeup(node)
2591}
2592
2593unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2594 match pos {
2595 0 => *node.token_0.get(),
2596 1 => *node.token_1.get(),
2597 2 => *node.token_2.get(),
2598 _ => unreachable!(),
2599 }
2600}
2601
2602fn release_node(ptr: *mut ReadinessNode) {
2603 unsafe {
2604 // `AcqRel` synchronizes with other `release_node` functions and ensures
2605 // that the drop happens after any reads / writes on other threads.
2606 if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2607 return;
2608 }
2609
2610 let node = Box::from_raw(ptr);
2611
2612 // Decrement the readiness_queue Arc
2613 let queue = node.readiness_queue.load(Acquire);
2614
2615 if queue.is_null() {
2616 return;
2617 }
2618
2619 let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2620 }
2621}
2622
2623impl AtomicState {
2624 fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2625 let state = ReadinessState::new(interest, opt);
2626
2627 AtomicState {
2628 inner: AtomicUsize::new(state.into()),
2629 }
2630 }
2631
2632 /// Loads the current `ReadinessState`
2633 fn load(&self, order: Ordering) -> ReadinessState {
2634 self.inner.load(order).into()
2635 }
2636
2637 /// Stores a state if the current state is the same as `current`.
2638 fn compare_and_swap(
2639 &self,
2640 current: ReadinessState,
2641 new: ReadinessState,
2642 order: Ordering,
2643 ) -> ReadinessState {
2644 self.inner
2645 .compare_and_swap(current.into(), new.into(), order)
2646 .into()
2647 }
2648
2649 // Returns `true` if the node should be queued
2650 fn flag_as_dropped(&self) -> bool {
2651 let prev: ReadinessState = self
2652 .inner
2653 .fetch_or(DROPPED_MASK | QUEUED_MASK, Release)
2654 .into();
2655 // The flag should not have been previously set
2656 debug_assert!(!prev.is_dropped());
2657
2658 !prev.is_queued()
2659 }
2660}
2661
2662impl ReadinessState {
2663 // Create a `ReadinessState` initialized with the provided arguments
2664 #[inline]
2665 fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2666 let interest = event::ready_as_usize(interest);
2667 let opt = event::opt_as_usize(opt);
2668
2669 debug_assert!(interest <= MASK_4);
2670 debug_assert!(opt <= MASK_4);
2671
2672 let mut val = interest << INTEREST_SHIFT;
2673 val |= opt << POLL_OPT_SHIFT;
2674
2675 ReadinessState(val)
2676 }
2677
2678 #[inline]
2679 fn get(self, mask: usize, shift: usize) -> usize {
2680 (self.0 >> shift) & mask
2681 }
2682
2683 #[inline]
2684 fn set(&mut self, val: usize, mask: usize, shift: usize) {
2685 self.0 = (self.0 & !(mask << shift)) | (val << shift)
2686 }
2687
2688 /// Get the readiness
2689 #[inline]
2690 fn readiness(self) -> Ready {
2691 let v = self.get(MASK_4, READINESS_SHIFT);
2692 event::ready_from_usize(v)
2693 }
2694
2695 #[inline]
2696 fn effective_readiness(self) -> Ready {
2697 self.readiness() & self.interest()
2698 }
2699
2700 /// Set the readiness
2701 #[inline]
2702 fn set_readiness(&mut self, v: Ready) {
2703 self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2704 }
2705
2706 /// Get the interest
2707 #[inline]
2708 fn interest(self) -> Ready {
2709 let v = self.get(MASK_4, INTEREST_SHIFT);
2710 event::ready_from_usize(v)
2711 }
2712
2713 /// Set the interest
2714 #[inline]
2715 fn set_interest(&mut self, v: Ready) {
2716 self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2717 }
2718
2719 #[inline]
2720 fn disarm(&mut self) {
2721 self.set_interest(Ready::empty());
2722 }
2723
2724 /// Get the poll options
2725 #[inline]
2726 fn poll_opt(self) -> PollOpt {
2727 let v = self.get(MASK_4, POLL_OPT_SHIFT);
2728 event::opt_from_usize(v)
2729 }
2730
2731 /// Set the poll options
2732 #[inline]
2733 fn set_poll_opt(&mut self, v: PollOpt) {
2734 self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2735 }
2736
2737 #[inline]
2738 fn is_queued(self) -> bool {
2739 self.0 & QUEUED_MASK == QUEUED_MASK
2740 }
2741
2742 /// Set the queued flag
2743 #[inline]
2744 fn set_queued(&mut self) {
2745 // Dropped nodes should never be queued
2746 debug_assert!(!self.is_dropped());
2747 self.0 |= QUEUED_MASK;
2748 }
2749
2750 #[inline]
2751 fn set_dequeued(&mut self) {
2752 debug_assert!(self.is_queued());
2753 self.0 &= !QUEUED_MASK
2754 }
2755
2756 #[inline]
2757 fn is_dropped(self) -> bool {
2758 self.0 & DROPPED_MASK == DROPPED_MASK
2759 }
2760
2761 #[inline]
2762 fn token_read_pos(self) -> usize {
2763 self.get(MASK_2, TOKEN_RD_SHIFT)
2764 }
2765
2766 #[inline]
2767 fn token_write_pos(self) -> usize {
2768 self.get(MASK_2, TOKEN_WR_SHIFT)
2769 }
2770
2771 #[inline]
2772 fn next_token_pos(self) -> usize {
2773 let rd = self.token_read_pos();
2774 let wr = self.token_write_pos();
2775
2776 match wr {
2777 0 => match rd {
2778 1 => 2,
2779 2 => 1,
2780 0 => 1,
2781 _ => unreachable!(),
2782 },
2783 1 => match rd {
2784 0 => 2,
2785 2 => 0,
2786 1 => 2,
2787 _ => unreachable!(),
2788 },
2789 2 => match rd {
2790 0 => 1,
2791 1 => 0,
2792 2 => 0,
2793 _ => unreachable!(),
2794 },
2795 _ => unreachable!(),
2796 }
2797 }
2798
2799 #[inline]
2800 fn set_token_write_pos(&mut self, val: usize) {
2801 self.set(val, MASK_2, TOKEN_WR_SHIFT);
2802 }
2803
2804 #[inline]
2805 fn update_token_read_pos(&mut self) {
2806 let val = self.token_write_pos();
2807 self.set(val, MASK_2, TOKEN_RD_SHIFT);
2808 }
2809}
2810
2811impl From<ReadinessState> for usize {
2812 fn from(src: ReadinessState) -> usize {
2813 src.0
2814 }
2815}
2816
2817impl From<usize> for ReadinessState {
2818 fn from(src: usize) -> ReadinessState {
2819 ReadinessState(src)
2820 }
2821}
2822
2823fn is_send<T: Send>() {}
2824fn is_sync<T: Sync>() {}
2825
2826impl SelectorId {
2827 pub fn new() -> SelectorId {
2828 SelectorId {
2829 id: AtomicUsize::new(0),
2830 }
2831 }
2832
2833 pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2834 let selector_id = self.id.load(Ordering::SeqCst);
2835
2836 if selector_id != 0 && selector_id != poll.selector.id() {
2837 Err(io::Error::new(
2838 io::ErrorKind::Other,
2839 "socket already registered",
2840 ))
2841 } else {
2842 self.id.store(poll.selector.id(), Ordering::SeqCst);
2843 Ok(())
2844 }
2845 }
2846}
2847
2848impl Clone for SelectorId {
2849 fn clone(&self) -> SelectorId {
2850 SelectorId {
2851 id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2852 }
2853 }
2854}
2855
2856#[test]
2857#[cfg(all(unix, not(target_os = "fuchsia")))]
2858pub fn as_raw_fd() {
2859 let poll = Poll::new().unwrap();
2860 assert!(poll.as_raw_fd() > 0);
2861}