Skip to main content

gstthreadshare/runtime/executor/
reactor.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// This is based on https://github.com/smol-rs/async-io
3// with adaptations by:
4//
5// Copyright (C) 2021-2022 François Laignel <fengalin@free.fr>
6
7use concurrent_queue::ConcurrentQueue;
8use futures::ready;
9use polling::{Event, Events, Poller};
10use slab::Slab;
11
12use std::borrow::Borrow;
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::future::Future;
17use std::io;
18use std::marker::PhantomData;
19use std::mem;
20use std::panic;
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll, Waker};
25use std::time::{Duration, Instant};
26
27// Choose the proper implementation of `Registration` based on the target platform.
28cfg_if::cfg_if! {
29    if #[cfg(windows)] {
30        mod windows;
31        pub use windows::Registration;
32    } else if #[cfg(any(
33        target_vendor = "apple",
34        target_os = "freebsd",
35        target_os = "netbsd",
36        target_os = "openbsd",
37        target_os = "dragonfly",
38    ))] {
39        mod kqueue;
40        pub use kqueue::Registration;
41    } else if #[cfg(unix)] {
42        mod unix;
43        pub use unix::Registration;
44    } else {
45        compile_error!("unsupported platform");
46    }
47}
48
49use crate::runtime::{Async, RUNTIME_CAT};
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54thread_local! {
55    static CURRENT_REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
56}
57
58#[derive(Debug)]
59pub(super) struct Reactor {
60    /// Portable bindings to epoll/kqueue/event ports/wepoll.
61    ///
62    /// This is where I/O is polled, producing I/O events.
63    poller: Poller,
64
65    /// Ticker bumped before polling.
66    ///
67    /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
68    /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
69    /// methods must make sure they don't receive stale I/O events - they only accept events from a
70    /// fresh "round" of `ReactorLock::react()`.
71    ticker: AtomicUsize,
72
73    /// Time when timers have been checked in current time slice.
74    timers_check_instant: Instant,
75
76    /// Time limit when timers are being fired in current time slice.
77    time_slice_end: Instant,
78
79    /// Half max throttling duration, needed to fire timers.
80    half_max_throttling: Duration,
81
82    /// List of wakers to wake when reacting.
83    wakers: Vec<Waker>,
84
85    /// Registered sources.
86    sources: Slab<Arc<Source>>,
87
88    /// Temporary storage for I/O events when polling the reactor.
89    ///
90    /// Holding a lock on this event list implies the exclusive right to poll I/O.
91    events: Events,
92
93    /// An ordered map of registered regular timers.
94    ///
95    /// Timers are in the order in which they fire. The `RegularTimerId` distinguishes
96    /// timers that fire at the same time. The `Waker` represents the task awaiting the
97    /// timer.
98    timers: BTreeMap<(Instant, RegularTimerId), Waker>,
99
100    /// An ordered map of registered after timers.
101    ///
102    /// These timers are guaranteed to fire no sooner than their expected time.
103    ///
104    /// Timers are in the order in which they fire. The `AfterTimerId` distinguishes
105    /// timers that fire at the same time. The `Waker` represents the task awaiting the
106    /// timer.
107    after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
108
109    /// A queue of timer operations (insert and remove).
110    ///
111    /// When inserting or removing a timer, we don't process it immediately - we just push it into
112    /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
113    timer_ops: ConcurrentQueue<TimerOp>,
114}
115
116impl Reactor {
117    fn new(max_throttling: Duration) -> Self {
118        Reactor {
119            poller: Poller::new().expect("cannot initialize I/O event notification"),
120            ticker: AtomicUsize::new(0),
121            timers_check_instant: Instant::now(),
122            time_slice_end: Instant::now(),
123            half_max_throttling: max_throttling / 2,
124            wakers: Vec::new(),
125            sources: Slab::new(),
126            events: Events::new(),
127            timers: BTreeMap::new(),
128            after_timers: BTreeMap::new(),
129            timer_ops: ConcurrentQueue::bounded(1000),
130        }
131    }
132
133    /// Initializes the reactor for current thread.
134    pub fn init(max_throttling: Duration) {
135        CURRENT_REACTOR.with(|cur| {
136            let mut cur = cur.borrow_mut();
137            if cur.is_none() {
138                *cur = Some(Reactor::new(max_throttling));
139            }
140        })
141    }
142
143    /// Clears the `Reactor`.
144    ///
145    /// It will be ready for reuse on current thread without reallocating.
146    pub fn clear() {
147        let _ = CURRENT_REACTOR.try_with(|cur_reactor| {
148            cur_reactor.borrow_mut().as_mut().map(|reactor| {
149                reactor.ticker = AtomicUsize::new(0);
150                reactor.wakers.clear();
151                reactor.sources.clear();
152                reactor.events.clear();
153                reactor.timers.clear();
154                reactor.after_timers.clear();
155                while !reactor.timer_ops.is_empty() {
156                    let _ = reactor.timer_ops.pop();
157                }
158            })
159        });
160    }
161
162    /// Executes the function with current thread's reactor as ref.
163    ///
164    /// # Panics
165    ///
166    /// Panics if:
167    ///
168    /// - The Reactor is not initialized, i.e. if
169    ///   current thread is not a [`Context`] thread.
170    /// - The Reactor is already mutably borrowed.
171    ///
172    /// Use [`Context::enter`] to register i/o sources
173    /// or timers from a different thread.
174    #[track_caller]
175    pub fn with<F, R>(f: F) -> R
176    where
177        F: FnOnce(&Reactor) -> R,
178    {
179        CURRENT_REACTOR.with(|reactor| {
180            f(reactor
181                .borrow()
182                .as_ref()
183                .expect("Reactor initialized at this point"))
184        })
185    }
186
187    /// Executes the function with current thread's reactor as mutable.
188    ///
189    /// # Panics
190    ///
191    /// Panics if:
192    ///
193    /// - The Reactor is not initialized, i.e. if
194    ///   current thread is not a [`Context`] thread.
195    /// - The Reactor is already mutably borrowed.
196    ///
197    /// Use [`Context::enter`] to register i/o sources
198    /// or timers from a different thread.
199    #[track_caller]
200    pub fn with_mut<F, R>(f: F) -> R
201    where
202        F: FnOnce(&mut Reactor) -> R,
203    {
204        CURRENT_REACTOR.with(|reactor| {
205            f(reactor
206                .borrow_mut()
207                .as_mut()
208                .expect("Reactor initialized at this point"))
209        })
210    }
211
212    /// Returns the current ticker.
213    pub fn ticker(&self) -> usize {
214        self.ticker.load(Ordering::SeqCst)
215    }
216
217    pub fn half_max_throttling(&self) -> Duration {
218        self.half_max_throttling
219    }
220
221    pub fn timers_check_instant(&self) -> Instant {
222        self.timers_check_instant
223    }
224
225    pub fn time_slice_end(&self) -> Instant {
226        self.time_slice_end
227    }
228
229    /// Registers an I/O source in the reactor.
230    pub fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> {
231        // Create an I/O source for this file descriptor.
232        let source = {
233            let key = self.sources.vacant_entry().key();
234            let source = Arc::new(Source {
235                registration: raw,
236                key,
237                state: Default::default(),
238            });
239            self.sources.insert(source.clone());
240            source
241        };
242
243        // Register the file descriptor.
244        if let Err(err) = source.registration.add(&self.poller, source.key) {
245            gst::error!(
246                crate::runtime::RUNTIME_CAT,
247                "Failed to register fd {:?}: {}",
248                source.registration,
249                err,
250            );
251            self.sources.remove(source.key);
252            return Err(err);
253        }
254
255        Ok(source)
256    }
257
258    /// Deregisters an I/O source from the reactor.
259    pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
260        self.sources.remove(source.key);
261        source.registration.delete(&self.poller)
262    }
263
264    /// Registers a regular timer in the reactor.
265    ///
266    /// Returns the inserted timer's ID.
267    pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
268        // Generate a new timer ID.
269        static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
270        let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
271
272        // Push an insert operation.
273        while self
274            .timer_ops
275            .push(TimerOp::Insert(when, id.into(), waker.clone()))
276            .is_err()
277        {
278            // If the queue is full, drain it and try again.
279            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
280            self.process_timer_ops();
281        }
282
283        id
284    }
285
286    /// Registers an after timer in the reactor.
287    ///
288    /// Returns the inserted timer's ID.
289    pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
290        // Generate a new timer ID.
291        static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
292        let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
293
294        // Push an insert operation.
295        while self
296            .timer_ops
297            .push(TimerOp::Insert(when, id.into(), waker.clone()))
298            .is_err()
299        {
300            // If the queue is full, drain it and try again.
301            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
302            self.process_timer_ops();
303        }
304
305        id
306    }
307
308    /// Deregisters a timer from the reactor.
309    pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
310        // Push a remove operation.
311        let id = id.into();
312        while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
313            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
314            // If the queue is full, drain it and try again.
315            self.process_timer_ops();
316        }
317    }
318
319    /// Processes ready timers and extends the list of wakers to wake.
320    fn process_timers(&mut self, now: Instant) {
321        self.process_timer_ops();
322
323        self.timers_check_instant = now;
324        self.time_slice_end = now + self.half_max_throttling;
325
326        // Split regular timers into ready and pending timers.
327        //
328        // Careful to split just *after* current time slice end,
329        // so that a timer set to fire in current time slice is
330        // considered ready.
331        let pending = self
332            .timers
333            .split_off(&(self.time_slice_end, RegularTimerId::NONE));
334        let ready = mem::replace(&mut self.timers, pending);
335
336        // Add wakers to the list.
337        if !ready.is_empty() {
338            gst::trace!(
339                RUNTIME_CAT,
340                "process_timers (regular): {} ready wakers",
341                ready.len()
342            );
343
344            for (_, waker) in ready {
345                self.wakers.push(waker);
346            }
347        }
348
349        // Split "at least" timers into ready and pending timers.
350        //
351        // Careful to split just *after* `now`,
352        // so that a timer set for exactly `now` is considered ready.
353        let pending = self
354            .after_timers
355            .split_off(&(self.timers_check_instant, AfterTimerId::NONE));
356        let ready = mem::replace(&mut self.after_timers, pending);
357
358        // Add wakers to the list.
359        if !ready.is_empty() {
360            gst::trace!(
361                RUNTIME_CAT,
362                "process_timers (after): {} ready wakers",
363                ready.len()
364            );
365
366            for (_, waker) in ready {
367                self.wakers.push(waker);
368            }
369        }
370    }
371
372    /// Processes queued timer operations.
373    fn process_timer_ops(&mut self) {
374        // Process only as much as fits into the queue, or else this loop could in theory run
375        // forever.
376        for _ in 0..self.timer_ops.capacity().unwrap() {
377            match self.timer_ops.pop() {
378                Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
379                    self.timers.insert((when, id), waker);
380                }
381                Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
382                    self.after_timers.insert((when, id), waker);
383                }
384                Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
385                    self.timers.remove(&(when, id));
386                }
387                Ok(TimerOp::Remove(when, TimerId::After(id))) => {
388                    self.after_timers.remove(&(when, id));
389                }
390                Err(_) => break,
391            }
392        }
393    }
394
395    /// Processes new events.
396    pub fn react(&mut self, now: Instant) -> io::Result<()> {
397        debug_assert!(self.wakers.is_empty());
398
399        // Process ready timers.
400        self.process_timers(now);
401
402        // Bump the ticker before polling I/O.
403        let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
404
405        self.events.clear();
406
407        // Block on I/O events.
408        let res = match self.poller.wait(&mut self.events, Some(Duration::ZERO)) {
409            // No I/O events occurred.
410            Ok(0) => Ok(()),
411            // At least one I/O event occurred.
412            Ok(_) => {
413                for ev in self.events.iter() {
414                    // Check if there is a source in the table with this key.
415                    if let Some(source) = self.sources.get(ev.key) {
416                        let mut state = source.state.lock().unwrap();
417
418                        // Collect wakers if any event was emitted.
419                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
420                            if emitted {
421                                state[dir].tick = tick;
422                                state[dir].drain_into(&mut self.wakers);
423                            }
424                        }
425
426                        // Re-register if there are still writers or readers. The can happen if
427                        // e.g. we were previously interested in both readability and writability,
428                        // but only one of them was emitted.
429                        if !state[READ].is_empty() || !state[WRITE].is_empty() {
430                            // Create the event that we are interested in.
431                            let event = {
432                                let mut event = Event::none(source.key);
433                                event.readable = !state[READ].is_empty();
434                                event.writable = !state[WRITE].is_empty();
435                                event
436                            };
437
438                            // Register interest in this event.
439                            source.registration.modify(&self.poller, event)?;
440                        }
441                    }
442                }
443
444                Ok(())
445            }
446
447            // The syscall was interrupted.
448            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
449
450            // An actual error occureed.
451            Err(err) => Err(err),
452        };
453
454        // Wake up ready tasks.
455        if !self.wakers.is_empty() {
456            gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
457
458            for waker in self.wakers.drain(..) {
459                // Don't let a panicking waker blow everything up.
460                panic::catch_unwind(|| waker.wake()).ok();
461            }
462        }
463
464        res
465    }
466}
467
468/// Timer will fire in its time slice.
469/// This can happen before of after the expected time.
470#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
471pub struct RegularTimerId(usize);
472impl RegularTimerId {
473    const NONE: RegularTimerId = RegularTimerId(0);
474}
475
476/// Timer is guaranteed to fire after the expected time.
477#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
478pub struct AfterTimerId(usize);
479impl AfterTimerId {
480    const NONE: AfterTimerId = AfterTimerId(0);
481}
482
483/// Any Timer Ids.
484#[derive(Copy, Clone, Debug)]
485pub(crate) enum TimerId {
486    Regular(RegularTimerId),
487    After(AfterTimerId),
488}
489
490impl From<RegularTimerId> for TimerId {
491    fn from(id: RegularTimerId) -> Self {
492        TimerId::Regular(id)
493    }
494}
495
496impl From<AfterTimerId> for TimerId {
497    fn from(id: AfterTimerId) -> Self {
498        TimerId::After(id)
499    }
500}
501
502/// A single timer operation.
503enum TimerOp {
504    Insert(Instant, TimerId, Waker),
505    Remove(Instant, TimerId),
506}
507
508/// A registered source of I/O events.
509#[derive(Debug)]
510pub(super) struct Source {
511    /// This source's registration into the reactor.
512    pub(super) registration: Registration,
513
514    /// The key of this source obtained during registration.
515    key: usize,
516
517    /// Inner state with registered wakers.
518    state: Mutex<[Direction; 2]>,
519}
520
521/// A read or write direction.
522#[derive(Debug, Default)]
523struct Direction {
524    /// Last reactor tick that delivered an event.
525    tick: usize,
526
527    /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
528    ticks: Option<(usize, usize)>,
529
530    /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
531    waker: Option<Waker>,
532
533    /// Wakers of tasks waiting for the next event.
534    ///
535    /// Registered by `Async::readable()` and `Async::writable()`.
536    wakers: Slab<Option<Waker>>,
537}
538
539impl Direction {
540    /// Returns `true` if there are no wakers interested in this direction.
541    fn is_empty(&self) -> bool {
542        self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
543    }
544
545    /// Moves all wakers into a `Vec`.
546    fn drain_into(&mut self, dst: &mut Vec<Waker>) {
547        if let Some(w) = self.waker.take() {
548            dst.push(w);
549        }
550        for (_, opt) in self.wakers.iter_mut() {
551            if let Some(w) = opt.take() {
552                dst.push(w);
553            }
554        }
555    }
556}
557
558impl Source {
559    /// Polls the I/O source for readability.
560    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
561        self.poll_ready(READ, cx)
562    }
563
564    /// Polls the I/O source for writability.
565    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
566        self.poll_ready(WRITE, cx)
567    }
568
569    /// Registers a waker from `poll_readable()` or `poll_writable()`.
570    ///
571    /// If a different waker is already registered, it gets replaced and woken.
572    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
573        let mut state = self.state.lock().unwrap();
574
575        // Check if the reactor has delivered an event.
576        if let Some((a, b)) = state[dir].ticks {
577            // If `state[dir].tick` has changed to a value other than the old reactor tick,
578            // that means a newer reactor tick has delivered an event.
579            if state[dir].tick != a && state[dir].tick != b {
580                state[dir].ticks = None;
581                return Poll::Ready(Ok(()));
582            }
583        }
584
585        let was_empty = state[dir].is_empty();
586
587        // Register the current task's waker.
588        if let Some(w) = state[dir].waker.take() {
589            if w.will_wake(cx.waker()) {
590                state[dir].waker = Some(w);
591                return Poll::Pending;
592            }
593            // Wake the previous waker because it's going to get replaced.
594            panic::catch_unwind(|| w.wake()).ok();
595        }
596
597        Reactor::with(|reactor| {
598            state[dir].waker = Some(cx.waker().clone());
599            state[dir].ticks = Some((reactor.ticker(), state[dir].tick));
600
601            // Update interest in this I/O handle.
602            if was_empty {
603                let event = {
604                    let mut event = Event::none(self.key);
605                    event.readable = !state[READ].is_empty();
606                    event.writable = !state[WRITE].is_empty();
607                    event
608                };
609
610                // Register interest in it.
611                self.registration.modify(&reactor.poller, event)?;
612            }
613
614            Poll::Pending
615        })
616    }
617
618    /// Waits until the I/O source is readable.
619    pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
620        Readable(Self::ready(handle, READ))
621    }
622
623    /// Waits until the I/O source is readable.
624    pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
625        ReadableOwned(Self::ready(handle, READ))
626    }
627
628    /// Waits until the I/O source is writable.
629    pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
630        Writable(Self::ready(handle, WRITE))
631    }
632
633    /// Waits until the I/O source is writable.
634    pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
635        WritableOwned(Self::ready(handle, WRITE))
636    }
637
638    /// Waits until the I/O source is readable or writable.
639    fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
640        Ready {
641            handle,
642            dir,
643            ticks: None,
644            index: None,
645            _guard: None,
646        }
647    }
648}
649
650/// Future for [`Async::readable`](crate::runtime::Async::readable).
651#[must_use = "futures do nothing unless you `.await` or poll them"]
652pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
653
654impl<T: Send + 'static> Future for Readable<'_, T> {
655    type Output = io::Result<()>;
656
657    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
658        ready!(Pin::new(&mut self.0).poll(cx))?;
659        gst::trace!(
660            RUNTIME_CAT,
661            "readable: fd={:?}",
662            self.0.handle.source.registration
663        );
664        Poll::Ready(Ok(()))
665    }
666}
667
668impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        f.debug_struct("Readable").finish()
671    }
672}
673
674/// Future for [`Async::readable_owned`](crate::runtime::Async::readable_owned).
675#[must_use = "futures do nothing unless you `.await` or poll them"]
676pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
677
678impl<T: Send + 'static> Future for ReadableOwned<T> {
679    type Output = io::Result<()>;
680
681    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
682        ready!(Pin::new(&mut self.0).poll(cx))?;
683        gst::trace!(
684            RUNTIME_CAT,
685            "readable_owned: fd={:?}",
686            self.0.handle.source.registration
687        );
688        Poll::Ready(Ok(()))
689    }
690}
691
692impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
693    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
694        f.debug_struct("ReadableOwned").finish()
695    }
696}
697
698/// Future for [`Async::writable`](crate::runtime::Async::writable).
699#[must_use = "futures do nothing unless you `.await` or poll them"]
700pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
701
702impl<T: Send + 'static> Future for Writable<'_, T> {
703    type Output = io::Result<()>;
704
705    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
706        ready!(Pin::new(&mut self.0).poll(cx))?;
707        gst::trace!(
708            RUNTIME_CAT,
709            "writable: fd={:?}",
710            self.0.handle.source.registration
711        );
712        Poll::Ready(Ok(()))
713    }
714}
715
716impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
717    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718        f.debug_struct("Writable").finish()
719    }
720}
721
722/// Future for [`Async::writable_owned`](crate::runtime::Async::writable_owned).
723#[must_use = "futures do nothing unless you `.await` or poll them"]
724pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
725
726impl<T: Send + 'static> Future for WritableOwned<T> {
727    type Output = io::Result<()>;
728
729    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
730        ready!(Pin::new(&mut self.0).poll(cx))?;
731        gst::trace!(
732            RUNTIME_CAT,
733            "writable_owned: fd={:?}",
734            self.0.handle.source.registration
735        );
736        Poll::Ready(Ok(()))
737    }
738}
739
740impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
741    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
742        f.debug_struct("WritableOwned").finish()
743    }
744}
745
746struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
747    handle: H,
748    dir: usize,
749    ticks: Option<(usize, usize)>,
750    index: Option<usize>,
751    _guard: Option<RemoveOnDrop<H, T>>,
752}
753
754impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
755
756impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
757    type Output = io::Result<()>;
758
759    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
760        let &mut Self {
761            ref handle,
762            ref mut dir,
763            ref mut ticks,
764            ref mut index,
765            ref mut _guard,
766            ..
767        } = &mut *self;
768
769        let mut state = handle.borrow().source.state.lock().unwrap();
770
771        // Check if the reactor has delivered an event.
772        if let Some((a, b)) = *ticks {
773            // If `state[dir].tick` has changed to a value other than the old reactor tick,
774            // that means a newer reactor tick has delivered an event.
775            if state[*dir].tick != a && state[*dir].tick != b {
776                return Poll::Ready(Ok(()));
777            }
778        }
779
780        let was_empty = state[*dir].is_empty();
781        Reactor::with(|reactor| {
782            // Register the current task's waker.
783            let i = match *index {
784                Some(i) => i,
785                None => {
786                    let i = state[*dir].wakers.insert(None);
787                    *_guard = Some(RemoveOnDrop {
788                        handle: handle.clone(),
789                        dir: *dir,
790                        key: i,
791                        _marker: PhantomData,
792                    });
793                    *index = Some(i);
794                    *ticks = Some((reactor.ticker(), state[*dir].tick));
795                    i
796                }
797            };
798            state[*dir].wakers[i] = Some(cx.waker().clone());
799
800            // Update interest in this I/O handle.
801            if was_empty {
802                // Create the event that we are interested in.
803                let event = {
804                    let mut event = Event::none(handle.borrow().source.key);
805                    event.readable = !state[READ].is_empty();
806                    event.writable = !state[WRITE].is_empty();
807                    event
808                };
809
810                handle
811                    .borrow()
812                    .source
813                    .registration
814                    .modify(&reactor.poller, event)?;
815            }
816
817            Poll::Pending
818        })
819    }
820}
821
822/// Remove waker when dropped.
823struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
824    handle: H,
825    dir: usize,
826    key: usize,
827    _marker: PhantomData<fn() -> T>,
828}
829
830impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
831    fn drop(&mut self) {
832        let mut state = self.handle.borrow().source.state.lock().unwrap();
833        let wakers = &mut state[self.dir].wakers;
834        if wakers.contains(self.key) {
835            wakers.remove(self.key);
836        }
837    }
838}