1use concurrent_queue::ConcurrentQueue;
8use futures::ready;
9use polling::{Event, Events, Poller};
10use slab::Slab;
11
12use std::borrow::Borrow;
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::future::Future;
17use std::io;
18use std::marker::PhantomData;
19use std::mem;
20use std::panic;
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll, Waker};
25use std::time::{Duration, Instant};
26
27cfg_if::cfg_if! {
29 if #[cfg(windows)] {
30 mod windows;
31 pub use windows::Registration;
32 } else if #[cfg(any(
33 target_vendor = "apple",
34 target_os = "freebsd",
35 target_os = "netbsd",
36 target_os = "openbsd",
37 target_os = "dragonfly",
38 ))] {
39 mod kqueue;
40 pub use kqueue::Registration;
41 } else if #[cfg(unix)] {
42 mod unix;
43 pub use unix::Registration;
44 } else {
45 compile_error!("unsupported platform");
46 }
47}
48
49use crate::runtime::{Async, RUNTIME_CAT};
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54thread_local! {
55 static CURRENT_REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
56}
57
58#[derive(Debug)]
59pub(super) struct Reactor {
60 poller: Poller,
64
65 ticker: AtomicUsize,
72
73 timers_check_instant: Instant,
75
76 time_slice_end: Instant,
78
79 half_max_throttling: Duration,
81
82 wakers: Vec<Waker>,
84
85 sources: Slab<Arc<Source>>,
87
88 events: Events,
92
93 timers: BTreeMap<(Instant, RegularTimerId), Waker>,
99
100 after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
108
109 timer_ops: ConcurrentQueue<TimerOp>,
114}
115
116impl Reactor {
117 fn new(max_throttling: Duration) -> Self {
118 Reactor {
119 poller: Poller::new().expect("cannot initialize I/O event notification"),
120 ticker: AtomicUsize::new(0),
121 timers_check_instant: Instant::now(),
122 time_slice_end: Instant::now(),
123 half_max_throttling: max_throttling / 2,
124 wakers: Vec::new(),
125 sources: Slab::new(),
126 events: Events::new(),
127 timers: BTreeMap::new(),
128 after_timers: BTreeMap::new(),
129 timer_ops: ConcurrentQueue::bounded(1000),
130 }
131 }
132
133 pub fn init(max_throttling: Duration) {
135 CURRENT_REACTOR.with(|cur| {
136 let mut cur = cur.borrow_mut();
137 if cur.is_none() {
138 *cur = Some(Reactor::new(max_throttling));
139 }
140 })
141 }
142
143 pub fn clear() {
147 let _ = CURRENT_REACTOR.try_with(|cur_reactor| {
148 cur_reactor.borrow_mut().as_mut().map(|reactor| {
149 reactor.ticker = AtomicUsize::new(0);
150 reactor.wakers.clear();
151 reactor.sources.clear();
152 reactor.events.clear();
153 reactor.timers.clear();
154 reactor.after_timers.clear();
155 while !reactor.timer_ops.is_empty() {
156 let _ = reactor.timer_ops.pop();
157 }
158 })
159 });
160 }
161
162 #[track_caller]
175 pub fn with<F, R>(f: F) -> R
176 where
177 F: FnOnce(&Reactor) -> R,
178 {
179 CURRENT_REACTOR.with(|reactor| {
180 f(reactor
181 .borrow()
182 .as_ref()
183 .expect("Reactor initialized at this point"))
184 })
185 }
186
187 #[track_caller]
200 pub fn with_mut<F, R>(f: F) -> R
201 where
202 F: FnOnce(&mut Reactor) -> R,
203 {
204 CURRENT_REACTOR.with(|reactor| {
205 f(reactor
206 .borrow_mut()
207 .as_mut()
208 .expect("Reactor initialized at this point"))
209 })
210 }
211
212 pub fn ticker(&self) -> usize {
214 self.ticker.load(Ordering::SeqCst)
215 }
216
217 pub fn half_max_throttling(&self) -> Duration {
218 self.half_max_throttling
219 }
220
221 pub fn timers_check_instant(&self) -> Instant {
222 self.timers_check_instant
223 }
224
225 pub fn time_slice_end(&self) -> Instant {
226 self.time_slice_end
227 }
228
229 pub fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> {
231 let source = {
233 let key = self.sources.vacant_entry().key();
234 let source = Arc::new(Source {
235 registration: raw,
236 key,
237 state: Default::default(),
238 });
239 self.sources.insert(source.clone());
240 source
241 };
242
243 if let Err(err) = source.registration.add(&self.poller, source.key) {
245 gst::error!(
246 crate::runtime::RUNTIME_CAT,
247 "Failed to register fd {:?}: {}",
248 source.registration,
249 err,
250 );
251 self.sources.remove(source.key);
252 return Err(err);
253 }
254
255 Ok(source)
256 }
257
258 pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
260 self.sources.remove(source.key);
261 source.registration.delete(&self.poller)
262 }
263
264 pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
268 static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
270 let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
271
272 while self
274 .timer_ops
275 .push(TimerOp::Insert(when, id.into(), waker.clone()))
276 .is_err()
277 {
278 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
280 self.process_timer_ops();
281 }
282
283 id
284 }
285
286 pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
290 static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
292 let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
293
294 while self
296 .timer_ops
297 .push(TimerOp::Insert(when, id.into(), waker.clone()))
298 .is_err()
299 {
300 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
302 self.process_timer_ops();
303 }
304
305 id
306 }
307
308 pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
310 let id = id.into();
312 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
313 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
314 self.process_timer_ops();
316 }
317 }
318
319 fn process_timers(&mut self, now: Instant) {
321 self.process_timer_ops();
322
323 self.timers_check_instant = now;
324 self.time_slice_end = now + self.half_max_throttling;
325
326 let pending = self
332 .timers
333 .split_off(&(self.time_slice_end, RegularTimerId::NONE));
334 let ready = mem::replace(&mut self.timers, pending);
335
336 if !ready.is_empty() {
338 gst::trace!(
339 RUNTIME_CAT,
340 "process_timers (regular): {} ready wakers",
341 ready.len()
342 );
343
344 for (_, waker) in ready {
345 self.wakers.push(waker);
346 }
347 }
348
349 let pending = self
354 .after_timers
355 .split_off(&(self.timers_check_instant, AfterTimerId::NONE));
356 let ready = mem::replace(&mut self.after_timers, pending);
357
358 if !ready.is_empty() {
360 gst::trace!(
361 RUNTIME_CAT,
362 "process_timers (after): {} ready wakers",
363 ready.len()
364 );
365
366 for (_, waker) in ready {
367 self.wakers.push(waker);
368 }
369 }
370 }
371
372 fn process_timer_ops(&mut self) {
374 for _ in 0..self.timer_ops.capacity().unwrap() {
377 match self.timer_ops.pop() {
378 Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
379 self.timers.insert((when, id), waker);
380 }
381 Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
382 self.after_timers.insert((when, id), waker);
383 }
384 Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
385 self.timers.remove(&(when, id));
386 }
387 Ok(TimerOp::Remove(when, TimerId::After(id))) => {
388 self.after_timers.remove(&(when, id));
389 }
390 Err(_) => break,
391 }
392 }
393 }
394
395 pub fn react(&mut self, now: Instant) -> io::Result<()> {
397 debug_assert!(self.wakers.is_empty());
398
399 self.process_timers(now);
401
402 let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
404
405 self.events.clear();
406
407 let res = match self.poller.wait(&mut self.events, Some(Duration::ZERO)) {
409 Ok(0) => Ok(()),
411 Ok(_) => {
413 for ev in self.events.iter() {
414 if let Some(source) = self.sources.get(ev.key) {
416 let mut state = source.state.lock().unwrap();
417
418 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
420 if emitted {
421 state[dir].tick = tick;
422 state[dir].drain_into(&mut self.wakers);
423 }
424 }
425
426 if !state[READ].is_empty() || !state[WRITE].is_empty() {
430 let event = {
432 let mut event = Event::none(source.key);
433 event.readable = !state[READ].is_empty();
434 event.writable = !state[WRITE].is_empty();
435 event
436 };
437
438 source.registration.modify(&self.poller, event)?;
440 }
441 }
442 }
443
444 Ok(())
445 }
446
447 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
449
450 Err(err) => Err(err),
452 };
453
454 if !self.wakers.is_empty() {
456 gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
457
458 for waker in self.wakers.drain(..) {
459 panic::catch_unwind(|| waker.wake()).ok();
461 }
462 }
463
464 res
465 }
466}
467
468#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
471pub struct RegularTimerId(usize);
472impl RegularTimerId {
473 const NONE: RegularTimerId = RegularTimerId(0);
474}
475
476#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
478pub struct AfterTimerId(usize);
479impl AfterTimerId {
480 const NONE: AfterTimerId = AfterTimerId(0);
481}
482
483#[derive(Copy, Clone, Debug)]
485pub(crate) enum TimerId {
486 Regular(RegularTimerId),
487 After(AfterTimerId),
488}
489
490impl From<RegularTimerId> for TimerId {
491 fn from(id: RegularTimerId) -> Self {
492 TimerId::Regular(id)
493 }
494}
495
496impl From<AfterTimerId> for TimerId {
497 fn from(id: AfterTimerId) -> Self {
498 TimerId::After(id)
499 }
500}
501
502enum TimerOp {
504 Insert(Instant, TimerId, Waker),
505 Remove(Instant, TimerId),
506}
507
508#[derive(Debug)]
510pub(super) struct Source {
511 pub(super) registration: Registration,
513
514 key: usize,
516
517 state: Mutex<[Direction; 2]>,
519}
520
521#[derive(Debug, Default)]
523struct Direction {
524 tick: usize,
526
527 ticks: Option<(usize, usize)>,
529
530 waker: Option<Waker>,
532
533 wakers: Slab<Option<Waker>>,
537}
538
539impl Direction {
540 fn is_empty(&self) -> bool {
542 self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
543 }
544
545 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
547 if let Some(w) = self.waker.take() {
548 dst.push(w);
549 }
550 for (_, opt) in self.wakers.iter_mut() {
551 if let Some(w) = opt.take() {
552 dst.push(w);
553 }
554 }
555 }
556}
557
558impl Source {
559 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
561 self.poll_ready(READ, cx)
562 }
563
564 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
566 self.poll_ready(WRITE, cx)
567 }
568
569 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
573 let mut state = self.state.lock().unwrap();
574
575 if let Some((a, b)) = state[dir].ticks {
577 if state[dir].tick != a && state[dir].tick != b {
580 state[dir].ticks = None;
581 return Poll::Ready(Ok(()));
582 }
583 }
584
585 let was_empty = state[dir].is_empty();
586
587 if let Some(w) = state[dir].waker.take() {
589 if w.will_wake(cx.waker()) {
590 state[dir].waker = Some(w);
591 return Poll::Pending;
592 }
593 panic::catch_unwind(|| w.wake()).ok();
595 }
596
597 Reactor::with(|reactor| {
598 state[dir].waker = Some(cx.waker().clone());
599 state[dir].ticks = Some((reactor.ticker(), state[dir].tick));
600
601 if was_empty {
603 let event = {
604 let mut event = Event::none(self.key);
605 event.readable = !state[READ].is_empty();
606 event.writable = !state[WRITE].is_empty();
607 event
608 };
609
610 self.registration.modify(&reactor.poller, event)?;
612 }
613
614 Poll::Pending
615 })
616 }
617
618 pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
620 Readable(Self::ready(handle, READ))
621 }
622
623 pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
625 ReadableOwned(Self::ready(handle, READ))
626 }
627
628 pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
630 Writable(Self::ready(handle, WRITE))
631 }
632
633 pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
635 WritableOwned(Self::ready(handle, WRITE))
636 }
637
638 fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
640 Ready {
641 handle,
642 dir,
643 ticks: None,
644 index: None,
645 _guard: None,
646 }
647 }
648}
649
650#[must_use = "futures do nothing unless you `.await` or poll them"]
652pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
653
654impl<T: Send + 'static> Future for Readable<'_, T> {
655 type Output = io::Result<()>;
656
657 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
658 ready!(Pin::new(&mut self.0).poll(cx))?;
659 gst::trace!(
660 RUNTIME_CAT,
661 "readable: fd={:?}",
662 self.0.handle.source.registration
663 );
664 Poll::Ready(Ok(()))
665 }
666}
667
668impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670 f.debug_struct("Readable").finish()
671 }
672}
673
674#[must_use = "futures do nothing unless you `.await` or poll them"]
676pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
677
678impl<T: Send + 'static> Future for ReadableOwned<T> {
679 type Output = io::Result<()>;
680
681 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
682 ready!(Pin::new(&mut self.0).poll(cx))?;
683 gst::trace!(
684 RUNTIME_CAT,
685 "readable_owned: fd={:?}",
686 self.0.handle.source.registration
687 );
688 Poll::Ready(Ok(()))
689 }
690}
691
692impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
693 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
694 f.debug_struct("ReadableOwned").finish()
695 }
696}
697
698#[must_use = "futures do nothing unless you `.await` or poll them"]
700pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
701
702impl<T: Send + 'static> Future for Writable<'_, T> {
703 type Output = io::Result<()>;
704
705 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
706 ready!(Pin::new(&mut self.0).poll(cx))?;
707 gst::trace!(
708 RUNTIME_CAT,
709 "writable: fd={:?}",
710 self.0.handle.source.registration
711 );
712 Poll::Ready(Ok(()))
713 }
714}
715
716impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
717 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718 f.debug_struct("Writable").finish()
719 }
720}
721
722#[must_use = "futures do nothing unless you `.await` or poll them"]
724pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
725
726impl<T: Send + 'static> Future for WritableOwned<T> {
727 type Output = io::Result<()>;
728
729 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
730 ready!(Pin::new(&mut self.0).poll(cx))?;
731 gst::trace!(
732 RUNTIME_CAT,
733 "writable_owned: fd={:?}",
734 self.0.handle.source.registration
735 );
736 Poll::Ready(Ok(()))
737 }
738}
739
740impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
741 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
742 f.debug_struct("WritableOwned").finish()
743 }
744}
745
746struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
747 handle: H,
748 dir: usize,
749 ticks: Option<(usize, usize)>,
750 index: Option<usize>,
751 _guard: Option<RemoveOnDrop<H, T>>,
752}
753
754impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
755
756impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
757 type Output = io::Result<()>;
758
759 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
760 let &mut Self {
761 ref handle,
762 ref mut dir,
763 ref mut ticks,
764 ref mut index,
765 ref mut _guard,
766 ..
767 } = &mut *self;
768
769 let mut state = handle.borrow().source.state.lock().unwrap();
770
771 if let Some((a, b)) = *ticks {
773 if state[*dir].tick != a && state[*dir].tick != b {
776 return Poll::Ready(Ok(()));
777 }
778 }
779
780 let was_empty = state[*dir].is_empty();
781 Reactor::with(|reactor| {
782 let i = match *index {
784 Some(i) => i,
785 None => {
786 let i = state[*dir].wakers.insert(None);
787 *_guard = Some(RemoveOnDrop {
788 handle: handle.clone(),
789 dir: *dir,
790 key: i,
791 _marker: PhantomData,
792 });
793 *index = Some(i);
794 *ticks = Some((reactor.ticker(), state[*dir].tick));
795 i
796 }
797 };
798 state[*dir].wakers[i] = Some(cx.waker().clone());
799
800 if was_empty {
802 let event = {
804 let mut event = Event::none(handle.borrow().source.key);
805 event.readable = !state[READ].is_empty();
806 event.writable = !state[WRITE].is_empty();
807 event
808 };
809
810 handle
811 .borrow()
812 .source
813 .registration
814 .modify(&reactor.poller, event)?;
815 }
816
817 Poll::Pending
818 })
819 }
820}
821
822struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
824 handle: H,
825 dir: usize,
826 key: usize,
827 _marker: PhantomData<fn() -> T>,
828}
829
830impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
831 fn drop(&mut self) {
832 let mut state = self.handle.borrow().source.state.lock().unwrap();
833 let wakers = &mut state[self.dir].wakers;
834 if wakers.contains(self.key) {
835 wakers.remove(self.key);
836 }
837 }
838}