Skip to main content

gstthreadshare/runtime/executor/
reactor.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// This is based on https://github.com/smol-rs/async-io
3// with adaptations by:
4//
5// Copyright (C) 2021-2022 François Laignel <fengalin@free.fr>
6
7use concurrent_queue::ConcurrentQueue;
8use futures::ready;
9use polling::{Event, Events, Poller};
10use slab::Slab;
11
12use std::borrow::Borrow;
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::future::Future;
17use std::io;
18use std::marker::PhantomData;
19use std::mem;
20use std::panic;
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll, Waker};
25use std::time::{Duration, Instant};
26
27// Choose the proper implementation of `Registration` based on the target platform.
28cfg_if::cfg_if! {
29    if #[cfg(windows)] {
30        mod windows;
31        pub use windows::Registration;
32    } else if #[cfg(any(
33        target_vendor = "apple",
34        target_os = "freebsd",
35        target_os = "netbsd",
36        target_os = "openbsd",
37        target_os = "dragonfly",
38    ))] {
39        mod kqueue;
40        pub use kqueue::Registration;
41    } else if #[cfg(unix)] {
42        mod unix;
43        pub use unix::Registration;
44    } else {
45        compile_error!("unsupported platform");
46    }
47}
48
49use crate::runtime::{Async, RUNTIME_CAT};
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54thread_local! {
55    static CURRENT_REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
56}
57
58#[derive(Debug)]
59pub(super) struct IOHandler {
60    /// Portable bindings to epoll/kqueue/event ports/wepoll.
61    ///
62    /// This is where I/O is polled, producing I/O events.
63    poller: Poller,
64
65    /// Registered sources.
66    sources: Slab<Arc<Source>>,
67
68    /// Temporary storage for I/O events when polling the reactor.
69    ///
70    /// Holding a lock on this event list implies the exclusive right to poll I/O.
71    events: Events,
72}
73
74impl IOHandler {
75    fn clear(&mut self) {
76        self.sources.drain().for_each(|s| {
77            let _ = s.registration.delete(&self.poller);
78        });
79        self.events.clear();
80    }
81
82    fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> {
83        // Create an I/O source for this file descriptor.
84        let source = {
85            let key = self.sources.vacant_entry().key();
86            let source = Arc::new(Source {
87                registration: raw,
88                key,
89                state: Default::default(),
90            });
91            self.sources.insert(source.clone());
92            source
93        };
94
95        // Register the file descriptor.
96        if let Err(err) = source.registration.add(&self.poller, source.key) {
97            gst::error!(
98                crate::runtime::RUNTIME_CAT,
99                "Failed to register fd {:?}: {}",
100                source.registration,
101                err,
102            );
103            self.sources.remove(source.key);
104            return Err(err);
105        }
106
107        Ok(source)
108    }
109
110    /// Deregisters an I/O source from the reactor.
111    pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
112        self.sources.remove(source.key);
113        source.registration.delete(&self.poller)
114    }
115
116    fn process(&mut self, tick: usize, wakers: &mut Vec<Waker>) -> io::Result<()> {
117        self.events.clear();
118
119        // Block on I/O events.
120        match self.poller.wait(&mut self.events, Some(Duration::ZERO)) {
121            // No I/O events occurred.
122            Ok(0) => Ok(()),
123            // At least one I/O event occurred.
124            Ok(_) => {
125                for ev in self.events.iter() {
126                    // Check if there is a source in the table with this key.
127                    if let Some(source) = self.sources.get(ev.key) {
128                        let mut state = source.state.lock().unwrap();
129
130                        // Collect wakers if any event was emitted.
131                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
132                            if emitted {
133                                state[dir].tick = tick;
134                                state[dir].drain_into(wakers);
135                            }
136                        }
137
138                        // Re-register if there are still writers or readers. The can happen if
139                        // e.g. we were previously interested in both readability and writability,
140                        // but only one of them was emitted.
141                        if !state[READ].is_empty() || !state[WRITE].is_empty() {
142                            // Create the event that we are interested in.
143                            let event = {
144                                let mut event = Event::none(source.key);
145                                event.readable = !state[READ].is_empty();
146                                event.writable = !state[WRITE].is_empty();
147                                event
148                            };
149
150                            // Register interest in this event.
151                            source.registration.modify(&self.poller, event)?;
152                        }
153                    }
154                }
155
156                Ok(())
157            }
158
159            // The syscall was interrupted.
160            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
161
162            // An actual error occurred.
163            Err(err) => Err(err),
164        }
165    }
166}
167
168impl Default for IOHandler {
169    fn default() -> Self {
170        IOHandler {
171            poller: Poller::new().expect("cannot initialize I/O event notification"),
172            sources: Default::default(),
173            events: Events::new(),
174        }
175    }
176}
177
178#[derive(Debug)]
179pub(super) struct Reactor {
180    /// I/O event handler.
181    ///
182    /// This is where I/O sources are registered, I/O is polled, producing I/O events.
183    pub(super) io_handler: Arc<Mutex<IOHandler>>,
184
185    /// Ticker bumped before polling.
186    ///
187    /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
188    /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
189    /// methods must make sure they don't receive stale I/O events - they only accept events from a
190    /// fresh "round" of `ReactorLock::react()`.
191    ticker: AtomicUsize,
192
193    /// Time when timers have been checked in current time slice.
194    timers_check_instant: Instant,
195
196    /// Time limit when timers are being fired in current time slice.
197    time_slice_end: Instant,
198
199    /// Half max throttling duration, needed to fire timers.
200    half_max_throttling: Duration,
201
202    /// List of wakers to wake when reacting.
203    wakers: Vec<Waker>,
204
205    /// An ordered map of registered regular timers.
206    ///
207    /// Timers are in the order in which they fire. The `RegularTimerId` distinguishes
208    /// timers that fire at the same time. The `Waker` represents the task awaiting the
209    /// timer.
210    timers: BTreeMap<(Instant, RegularTimerId), Waker>,
211
212    /// An ordered map of registered after timers.
213    ///
214    /// These timers are guaranteed to fire no sooner than their expected time.
215    ///
216    /// Timers are in the order in which they fire. The `AfterTimerId` distinguishes
217    /// timers that fire at the same time. The `Waker` represents the task awaiting the
218    /// timer.
219    after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
220
221    /// A queue of timer operations (insert and remove).
222    ///
223    /// When inserting or removing a timer, we don't process it immediately - we just push it into
224    /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
225    timer_ops: ConcurrentQueue<TimerOp>,
226}
227
228impl Reactor {
229    fn new(max_throttling: Duration) -> Self {
230        Reactor {
231            io_handler: Default::default(),
232            ticker: AtomicUsize::new(0),
233            timers_check_instant: Instant::now(),
234            time_slice_end: Instant::now(),
235            half_max_throttling: max_throttling / 2,
236            wakers: Vec::new(),
237            timers: BTreeMap::new(),
238            after_timers: BTreeMap::new(),
239            timer_ops: ConcurrentQueue::bounded(1000),
240        }
241    }
242
243    /// Initializes the reactor for current thread.
244    pub fn init(max_throttling: Duration) {
245        CURRENT_REACTOR.with(|cur| {
246            let mut cur = cur.borrow_mut();
247            if cur.is_none() {
248                *cur = Some(Reactor::new(max_throttling));
249            }
250        })
251    }
252
253    /// Clears the `Reactor`.
254    ///
255    /// It will be ready for reuse on current thread without reallocating.
256    pub fn clear() {
257        let _ = CURRENT_REACTOR.try_with(|cur_reactor| {
258            cur_reactor.borrow_mut().as_mut().map(|reactor| {
259                reactor.ticker = AtomicUsize::new(0);
260                reactor.wakers.clear();
261                reactor.io_handler.lock().unwrap().clear();
262                reactor.timers.clear();
263                reactor.after_timers.clear();
264                while !reactor.timer_ops.is_empty() {
265                    let _ = reactor.timer_ops.pop();
266                }
267            })
268        });
269    }
270
271    /// Executes the function with current thread's reactor as ref.
272    ///
273    /// # Panics
274    ///
275    /// Panics if:
276    ///
277    /// - The Reactor is not initialized, i.e. if
278    ///   current thread is not a [`Context`] thread.
279    /// - The Reactor is already mutably borrowed.
280    ///
281    /// Use [`Context::enter`] to register i/o sources
282    /// or timers from a different thread.
283    #[track_caller]
284    pub fn with<F, R>(f: F) -> R
285    where
286        F: FnOnce(&Reactor) -> R,
287    {
288        CURRENT_REACTOR.with(|reactor| {
289            f(reactor
290                .borrow()
291                .as_ref()
292                .expect("Reactor initialized at this point"))
293        })
294    }
295
296    /// Executes the function with current thread's reactor as mutable.
297    ///
298    /// # Panics
299    ///
300    /// Panics if:
301    ///
302    /// - The Reactor is not initialized, i.e. if
303    ///   current thread is not a [`Context`] thread.
304    /// - The Reactor is already mutably borrowed.
305    ///
306    /// Use [`Context::enter`] to register i/o sources
307    /// or timers from a different thread.
308    #[track_caller]
309    pub fn with_mut<F, R>(f: F) -> R
310    where
311        F: FnOnce(&mut Reactor) -> R,
312    {
313        CURRENT_REACTOR.with(|reactor| {
314            f(reactor
315                .borrow_mut()
316                .as_mut()
317                .expect("Reactor initialized at this point"))
318        })
319    }
320
321    /// Returns the current ticker.
322    pub fn ticker(&self) -> usize {
323        self.ticker.load(Ordering::SeqCst)
324    }
325
326    pub fn half_max_throttling(&self) -> Duration {
327        self.half_max_throttling
328    }
329
330    pub fn timers_check_instant(&self) -> Instant {
331        self.timers_check_instant
332    }
333
334    pub fn time_slice_end(&self) -> Instant {
335        self.time_slice_end
336    }
337
338    /// Registers an I/O source in the reactor.
339    pub fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
340        self.io_handler.lock().unwrap().insert_io(raw)
341    }
342
343    /// Deregisters an I/O source from the reactor.
344    pub fn remove_io(&self, source: &Source) -> io::Result<()> {
345        self.io_handler.lock().unwrap().remove_io(source)
346    }
347
348    /// Registers a regular timer in the reactor.
349    ///
350    /// Returns the inserted timer's ID.
351    pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
352        // Generate a new timer ID.
353        static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
354        let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
355
356        // Push an insert operation.
357        while self
358            .timer_ops
359            .push(TimerOp::Insert(when, id.into(), waker.clone()))
360            .is_err()
361        {
362            // If the queue is full, drain it and try again.
363            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
364            self.process_timer_ops();
365        }
366
367        id
368    }
369
370    /// Registers an after timer in the reactor.
371    ///
372    /// Returns the inserted timer's ID.
373    pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
374        // Generate a new timer ID.
375        static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
376        let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
377
378        // Push an insert operation.
379        while self
380            .timer_ops
381            .push(TimerOp::Insert(when, id.into(), waker.clone()))
382            .is_err()
383        {
384            // If the queue is full, drain it and try again.
385            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
386            self.process_timer_ops();
387        }
388
389        id
390    }
391
392    /// Deregisters a timer from the reactor.
393    pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
394        // Push a remove operation.
395        let id = id.into();
396        while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
397            gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
398            // If the queue is full, drain it and try again.
399            self.process_timer_ops();
400        }
401    }
402
403    /// Processes ready timers and extends the list of wakers to wake.
404    fn process_timers(&mut self, now: Instant) {
405        self.process_timer_ops();
406
407        self.timers_check_instant = now;
408        self.time_slice_end = now + self.half_max_throttling;
409
410        // Split regular timers into ready and pending timers.
411        //
412        // Careful to split just *after* current time slice end,
413        // so that a timer set to fire in current time slice is
414        // considered ready.
415        let pending = self
416            .timers
417            .split_off(&(self.time_slice_end, RegularTimerId::NONE));
418        let ready = mem::replace(&mut self.timers, pending);
419
420        // Add wakers to the list.
421        if !ready.is_empty() {
422            gst::trace!(
423                RUNTIME_CAT,
424                "process_timers (regular): {} ready wakers",
425                ready.len()
426            );
427
428            for (_, waker) in ready {
429                self.wakers.push(waker);
430            }
431        }
432
433        // Split "at least" timers into ready and pending timers.
434        //
435        // Careful to split just *after* `now`,
436        // so that a timer set for exactly `now` is considered ready.
437        let pending = self
438            .after_timers
439            .split_off(&(self.timers_check_instant, AfterTimerId::NONE));
440        let ready = mem::replace(&mut self.after_timers, pending);
441
442        // Add wakers to the list.
443        if !ready.is_empty() {
444            gst::trace!(
445                RUNTIME_CAT,
446                "process_timers (after): {} ready wakers",
447                ready.len()
448            );
449
450            for (_, waker) in ready {
451                self.wakers.push(waker);
452            }
453        }
454    }
455
456    /// Processes queued timer operations.
457    fn process_timer_ops(&mut self) {
458        // Process only as much as fits into the queue, or else this loop could in theory run
459        // forever.
460        for _ in 0..self.timer_ops.capacity().unwrap() {
461            match self.timer_ops.pop() {
462                Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
463                    self.timers.insert((when, id), waker);
464                }
465                Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
466                    self.after_timers.insert((when, id), waker);
467                }
468                Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
469                    self.timers.remove(&(when, id));
470                }
471                Ok(TimerOp::Remove(when, TimerId::After(id))) => {
472                    self.after_timers.remove(&(when, id));
473                }
474                Err(_) => break,
475            }
476        }
477    }
478
479    /// Processes new events.
480    pub fn react(&mut self, now: Instant) -> io::Result<()> {
481        debug_assert!(self.wakers.is_empty());
482
483        // Process ready timers.
484        self.process_timers(now);
485
486        // Bump the ticker before polling I/O.
487        let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
488
489        // Block on I/O events.
490        let res = self
491            .io_handler
492            .lock()
493            .unwrap()
494            .process(tick, &mut self.wakers);
495
496        // Wake up ready tasks.
497        if !self.wakers.is_empty() {
498            gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
499
500            for waker in self.wakers.drain(..) {
501                // Don't let a panicking waker blow everything up.
502                panic::catch_unwind(|| waker.wake()).ok();
503            }
504        }
505
506        res
507    }
508}
509
510/// Timer will fire in its time slice.
511/// This can happen before of after the expected time.
512#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
513pub struct RegularTimerId(usize);
514impl RegularTimerId {
515    const NONE: RegularTimerId = RegularTimerId(0);
516}
517
518/// Timer is guaranteed to fire after the expected time.
519#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
520pub struct AfterTimerId(usize);
521impl AfterTimerId {
522    const NONE: AfterTimerId = AfterTimerId(0);
523}
524
525/// Any Timer Ids.
526#[derive(Copy, Clone, Debug)]
527pub(crate) enum TimerId {
528    Regular(RegularTimerId),
529    After(AfterTimerId),
530}
531
532impl From<RegularTimerId> for TimerId {
533    fn from(id: RegularTimerId) -> Self {
534        TimerId::Regular(id)
535    }
536}
537
538impl From<AfterTimerId> for TimerId {
539    fn from(id: AfterTimerId) -> Self {
540        TimerId::After(id)
541    }
542}
543
544/// A single timer operation.
545enum TimerOp {
546    Insert(Instant, TimerId, Waker),
547    Remove(Instant, TimerId),
548}
549
550/// A registered source of I/O events.
551#[derive(Debug)]
552pub(super) struct Source {
553    /// This source's registration into the reactor.
554    pub(super) registration: Registration,
555
556    /// The key of this source obtained during registration.
557    key: usize,
558
559    /// Inner state with registered wakers.
560    state: Mutex<[Direction; 2]>,
561}
562
563/// A read or write direction.
564#[derive(Debug, Default)]
565struct Direction {
566    /// Last reactor tick that delivered an event.
567    tick: usize,
568
569    /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
570    ticks: Option<(usize, usize)>,
571
572    /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
573    waker: Option<Waker>,
574
575    /// Wakers of tasks waiting for the next event.
576    ///
577    /// Registered by `Async::readable()` and `Async::writable()`.
578    wakers: Slab<Option<Waker>>,
579}
580
581impl Direction {
582    /// Returns `true` if there are no wakers interested in this direction.
583    fn is_empty(&self) -> bool {
584        self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
585    }
586
587    /// Moves all wakers into a `Vec`.
588    fn drain_into(&mut self, dst: &mut Vec<Waker>) {
589        if let Some(w) = self.waker.take() {
590            dst.push(w);
591        }
592        for (_, opt) in self.wakers.iter_mut() {
593            if let Some(w) = opt.take() {
594                dst.push(w);
595            }
596        }
597    }
598}
599
600impl Source {
601    /// Polls the I/O source for readability.
602    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603        self.poll_ready(READ, cx)
604    }
605
606    /// Polls the I/O source for writability.
607    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
608        self.poll_ready(WRITE, cx)
609    }
610
611    /// Registers a waker from `poll_readable()` or `poll_writable()`.
612    ///
613    /// If a different waker is already registered, it gets replaced and woken.
614    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
615        let mut state = self.state.lock().unwrap();
616
617        // Check if the reactor has delivered an event.
618        if let Some((a, b)) = state[dir].ticks {
619            // If `state[dir].tick` has changed to a value other than the old reactor tick,
620            // that means a newer reactor tick has delivered an event.
621            if state[dir].tick != a && state[dir].tick != b {
622                state[dir].ticks = None;
623                return Poll::Ready(Ok(()));
624            }
625        }
626
627        let was_empty = state[dir].is_empty();
628
629        // Register the current task's waker.
630        if let Some(w) = state[dir].waker.take() {
631            if w.will_wake(cx.waker()) {
632                state[dir].waker = Some(w);
633                return Poll::Pending;
634            }
635            // Wake the previous waker because it's going to get replaced.
636            panic::catch_unwind(|| w.wake()).ok();
637        }
638
639        Reactor::with(|reactor| {
640            state[dir].waker = Some(cx.waker().clone());
641            state[dir].ticks = Some((reactor.ticker(), state[dir].tick));
642
643            // Update interest in this I/O handle.
644            if was_empty {
645                let event = {
646                    let mut event = Event::none(self.key);
647                    event.readable = !state[READ].is_empty();
648                    event.writable = !state[WRITE].is_empty();
649                    event
650                };
651
652                // Register interest in it.
653                self.registration
654                    .modify(&reactor.io_handler.lock().unwrap().poller, event)?;
655            }
656
657            Poll::Pending
658        })
659    }
660
661    /// Waits until the I/O source is readable.
662    pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
663        Readable(Self::ready(handle, READ))
664    }
665
666    /// Waits until the I/O source is readable.
667    pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
668        ReadableOwned(Self::ready(handle, READ))
669    }
670
671    /// Waits until the I/O source is writable.
672    pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
673        Writable(Self::ready(handle, WRITE))
674    }
675
676    /// Waits until the I/O source is writable.
677    pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
678        WritableOwned(Self::ready(handle, WRITE))
679    }
680
681    /// Waits until the I/O source is readable or writable.
682    fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
683        Ready {
684            handle,
685            dir,
686            ticks: None,
687            index: None,
688            _guard: None,
689        }
690    }
691}
692
693/// Future for [`Async::readable`](crate::runtime::Async::readable).
694#[must_use = "futures do nothing unless you `.await` or poll them"]
695pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
696
697impl<T: Send + 'static> Future for Readable<'_, T> {
698    type Output = io::Result<()>;
699
700    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
701        ready!(Pin::new(&mut self.0).poll(cx))?;
702        gst::trace!(
703            RUNTIME_CAT,
704            "readable: fd={:?}",
705            self.0.handle.source.registration
706        );
707        Poll::Ready(Ok(()))
708    }
709}
710
711impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
712    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
713        f.debug_struct("Readable").finish()
714    }
715}
716
717/// Future for [`Async::readable_owned`](crate::runtime::Async::readable_owned).
718#[must_use = "futures do nothing unless you `.await` or poll them"]
719pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
720
721impl<T: Send + 'static> Future for ReadableOwned<T> {
722    type Output = io::Result<()>;
723
724    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
725        ready!(Pin::new(&mut self.0).poll(cx))?;
726        gst::trace!(
727            RUNTIME_CAT,
728            "readable_owned: fd={:?}",
729            self.0.handle.source.registration
730        );
731        Poll::Ready(Ok(()))
732    }
733}
734
735impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
736    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
737        f.debug_struct("ReadableOwned").finish()
738    }
739}
740
741/// Future for [`Async::writable`](crate::runtime::Async::writable).
742#[must_use = "futures do nothing unless you `.await` or poll them"]
743pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
744
745impl<T: Send + 'static> Future for Writable<'_, T> {
746    type Output = io::Result<()>;
747
748    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
749        ready!(Pin::new(&mut self.0).poll(cx))?;
750        gst::trace!(
751            RUNTIME_CAT,
752            "writable: fd={:?}",
753            self.0.handle.source.registration
754        );
755        Poll::Ready(Ok(()))
756    }
757}
758
759impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
760    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761        f.debug_struct("Writable").finish()
762    }
763}
764
765/// Future for [`Async::writable_owned`](crate::runtime::Async::writable_owned).
766#[must_use = "futures do nothing unless you `.await` or poll them"]
767pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
768
769impl<T: Send + 'static> Future for WritableOwned<T> {
770    type Output = io::Result<()>;
771
772    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
773        ready!(Pin::new(&mut self.0).poll(cx))?;
774        gst::trace!(
775            RUNTIME_CAT,
776            "writable_owned: fd={:?}",
777            self.0.handle.source.registration
778        );
779        Poll::Ready(Ok(()))
780    }
781}
782
783impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
784    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785        f.debug_struct("WritableOwned").finish()
786    }
787}
788
789struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
790    handle: H,
791    dir: usize,
792    ticks: Option<(usize, usize)>,
793    index: Option<usize>,
794    _guard: Option<RemoveOnDrop<H, T>>,
795}
796
797impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
798
799impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
800    type Output = io::Result<()>;
801
802    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
803        let &mut Self {
804            ref handle,
805            ref mut dir,
806            ref mut ticks,
807            ref mut index,
808            ref mut _guard,
809            ..
810        } = &mut *self;
811
812        let mut state = handle.borrow().source.state.lock().unwrap();
813
814        // Check if the reactor has delivered an event.
815        if let Some((a, b)) = *ticks {
816            // If `state[dir].tick` has changed to a value other than the old reactor tick,
817            // that means a newer reactor tick has delivered an event.
818            if state[*dir].tick != a && state[*dir].tick != b {
819                return Poll::Ready(Ok(()));
820            }
821        }
822
823        let was_empty = state[*dir].is_empty();
824        Reactor::with(|reactor| {
825            // Register the current task's waker.
826            let i = match *index {
827                Some(i) => i,
828                None => {
829                    let i = state[*dir].wakers.insert(None);
830                    *_guard = Some(RemoveOnDrop {
831                        handle: handle.clone(),
832                        dir: *dir,
833                        key: i,
834                        _marker: PhantomData,
835                    });
836                    *index = Some(i);
837                    *ticks = Some((reactor.ticker(), state[*dir].tick));
838                    i
839                }
840            };
841            state[*dir].wakers[i] = Some(cx.waker().clone());
842
843            // Update interest in this I/O handle.
844            if was_empty {
845                // Create the event that we are interested in.
846                let event = {
847                    let mut event = Event::none(handle.borrow().source.key);
848                    event.readable = !state[READ].is_empty();
849                    event.writable = !state[WRITE].is_empty();
850                    event
851                };
852
853                handle
854                    .borrow()
855                    .source
856                    .registration
857                    .modify(&reactor.io_handler.lock().unwrap().poller, event)?;
858            }
859
860            Poll::Pending
861        })
862    }
863}
864
865/// Remove waker when dropped.
866struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
867    handle: H,
868    dir: usize,
869    key: usize,
870    _marker: PhantomData<fn() -> T>,
871}
872
873impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
874    fn drop(&mut self) {
875        let mut state = self.handle.borrow().source.state.lock().unwrap();
876        let wakers = &mut state[self.dir].wakers;
877        if wakers.contains(self.key) {
878            wakers.remove(self.key);
879        }
880    }
881}