1use concurrent_queue::ConcurrentQueue;
8use futures::ready;
9use polling::{Event, Events, Poller};
10use slab::Slab;
11
12use std::borrow::Borrow;
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::future::Future;
17use std::io;
18use std::marker::PhantomData;
19use std::mem;
20use std::panic;
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll, Waker};
25use std::time::{Duration, Instant};
26
27cfg_if::cfg_if! {
29 if #[cfg(windows)] {
30 mod windows;
31 pub use windows::Registration;
32 } else if #[cfg(any(
33 target_vendor = "apple",
34 target_os = "freebsd",
35 target_os = "netbsd",
36 target_os = "openbsd",
37 target_os = "dragonfly",
38 ))] {
39 mod kqueue;
40 pub use kqueue::Registration;
41 } else if #[cfg(unix)] {
42 mod unix;
43 pub use unix::Registration;
44 } else {
45 compile_error!("unsupported platform");
46 }
47}
48
49use crate::runtime::{Async, RUNTIME_CAT};
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54thread_local! {
55 static CURRENT_REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
56}
57
58#[derive(Debug)]
59pub(super) struct IOHandler {
60 poller: Poller,
64
65 sources: Slab<Arc<Source>>,
67
68 events: Events,
72}
73
74impl IOHandler {
75 fn clear(&mut self) {
76 self.sources.drain().for_each(|s| {
77 let _ = s.registration.delete(&self.poller);
78 });
79 self.events.clear();
80 }
81
82 fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> {
83 let source = {
85 let key = self.sources.vacant_entry().key();
86 let source = Arc::new(Source {
87 registration: raw,
88 key,
89 state: Default::default(),
90 });
91 self.sources.insert(source.clone());
92 source
93 };
94
95 if let Err(err) = source.registration.add(&self.poller, source.key) {
97 gst::error!(
98 crate::runtime::RUNTIME_CAT,
99 "Failed to register fd {:?}: {}",
100 source.registration,
101 err,
102 );
103 self.sources.remove(source.key);
104 return Err(err);
105 }
106
107 Ok(source)
108 }
109
110 pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
112 self.sources.remove(source.key);
113 source.registration.delete(&self.poller)
114 }
115
116 fn process(&mut self, tick: usize, wakers: &mut Vec<Waker>) -> io::Result<()> {
117 self.events.clear();
118
119 match self.poller.wait(&mut self.events, Some(Duration::ZERO)) {
121 Ok(0) => Ok(()),
123 Ok(_) => {
125 for ev in self.events.iter() {
126 if let Some(source) = self.sources.get(ev.key) {
128 let mut state = source.state.lock().unwrap();
129
130 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
132 if emitted {
133 state[dir].tick = tick;
134 state[dir].drain_into(wakers);
135 }
136 }
137
138 if !state[READ].is_empty() || !state[WRITE].is_empty() {
142 let event = {
144 let mut event = Event::none(source.key);
145 event.readable = !state[READ].is_empty();
146 event.writable = !state[WRITE].is_empty();
147 event
148 };
149
150 source.registration.modify(&self.poller, event)?;
152 }
153 }
154 }
155
156 Ok(())
157 }
158
159 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
161
162 Err(err) => Err(err),
164 }
165 }
166}
167
168impl Default for IOHandler {
169 fn default() -> Self {
170 IOHandler {
171 poller: Poller::new().expect("cannot initialize I/O event notification"),
172 sources: Default::default(),
173 events: Events::new(),
174 }
175 }
176}
177
178#[derive(Debug)]
179pub(super) struct Reactor {
180 pub(super) io_handler: Arc<Mutex<IOHandler>>,
184
185 ticker: AtomicUsize,
192
193 timers_check_instant: Instant,
195
196 time_slice_end: Instant,
198
199 half_max_throttling: Duration,
201
202 wakers: Vec<Waker>,
204
205 timers: BTreeMap<(Instant, RegularTimerId), Waker>,
211
212 after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
220
221 timer_ops: ConcurrentQueue<TimerOp>,
226}
227
228impl Reactor {
229 fn new(max_throttling: Duration) -> Self {
230 Reactor {
231 io_handler: Default::default(),
232 ticker: AtomicUsize::new(0),
233 timers_check_instant: Instant::now(),
234 time_slice_end: Instant::now(),
235 half_max_throttling: max_throttling / 2,
236 wakers: Vec::new(),
237 timers: BTreeMap::new(),
238 after_timers: BTreeMap::new(),
239 timer_ops: ConcurrentQueue::bounded(1000),
240 }
241 }
242
243 pub fn init(max_throttling: Duration) {
245 CURRENT_REACTOR.with(|cur| {
246 let mut cur = cur.borrow_mut();
247 if cur.is_none() {
248 *cur = Some(Reactor::new(max_throttling));
249 }
250 })
251 }
252
253 pub fn clear() {
257 let _ = CURRENT_REACTOR.try_with(|cur_reactor| {
258 cur_reactor.borrow_mut().as_mut().map(|reactor| {
259 reactor.ticker = AtomicUsize::new(0);
260 reactor.wakers.clear();
261 reactor.io_handler.lock().unwrap().clear();
262 reactor.timers.clear();
263 reactor.after_timers.clear();
264 while !reactor.timer_ops.is_empty() {
265 let _ = reactor.timer_ops.pop();
266 }
267 })
268 });
269 }
270
271 #[track_caller]
284 pub fn with<F, R>(f: F) -> R
285 where
286 F: FnOnce(&Reactor) -> R,
287 {
288 CURRENT_REACTOR.with(|reactor| {
289 f(reactor
290 .borrow()
291 .as_ref()
292 .expect("Reactor initialized at this point"))
293 })
294 }
295
296 #[track_caller]
309 pub fn with_mut<F, R>(f: F) -> R
310 where
311 F: FnOnce(&mut Reactor) -> R,
312 {
313 CURRENT_REACTOR.with(|reactor| {
314 f(reactor
315 .borrow_mut()
316 .as_mut()
317 .expect("Reactor initialized at this point"))
318 })
319 }
320
321 pub fn ticker(&self) -> usize {
323 self.ticker.load(Ordering::SeqCst)
324 }
325
326 pub fn half_max_throttling(&self) -> Duration {
327 self.half_max_throttling
328 }
329
330 pub fn timers_check_instant(&self) -> Instant {
331 self.timers_check_instant
332 }
333
334 pub fn time_slice_end(&self) -> Instant {
335 self.time_slice_end
336 }
337
338 pub fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
340 self.io_handler.lock().unwrap().insert_io(raw)
341 }
342
343 pub fn remove_io(&self, source: &Source) -> io::Result<()> {
345 self.io_handler.lock().unwrap().remove_io(source)
346 }
347
348 pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
352 static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
354 let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
355
356 while self
358 .timer_ops
359 .push(TimerOp::Insert(when, id.into(), waker.clone()))
360 .is_err()
361 {
362 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
364 self.process_timer_ops();
365 }
366
367 id
368 }
369
370 pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
374 static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
376 let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
377
378 while self
380 .timer_ops
381 .push(TimerOp::Insert(when, id.into(), waker.clone()))
382 .is_err()
383 {
384 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
386 self.process_timer_ops();
387 }
388
389 id
390 }
391
392 pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
394 let id = id.into();
396 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
397 gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
398 self.process_timer_ops();
400 }
401 }
402
403 fn process_timers(&mut self, now: Instant) {
405 self.process_timer_ops();
406
407 self.timers_check_instant = now;
408 self.time_slice_end = now + self.half_max_throttling;
409
410 let pending = self
416 .timers
417 .split_off(&(self.time_slice_end, RegularTimerId::NONE));
418 let ready = mem::replace(&mut self.timers, pending);
419
420 if !ready.is_empty() {
422 gst::trace!(
423 RUNTIME_CAT,
424 "process_timers (regular): {} ready wakers",
425 ready.len()
426 );
427
428 for (_, waker) in ready {
429 self.wakers.push(waker);
430 }
431 }
432
433 let pending = self
438 .after_timers
439 .split_off(&(self.timers_check_instant, AfterTimerId::NONE));
440 let ready = mem::replace(&mut self.after_timers, pending);
441
442 if !ready.is_empty() {
444 gst::trace!(
445 RUNTIME_CAT,
446 "process_timers (after): {} ready wakers",
447 ready.len()
448 );
449
450 for (_, waker) in ready {
451 self.wakers.push(waker);
452 }
453 }
454 }
455
456 fn process_timer_ops(&mut self) {
458 for _ in 0..self.timer_ops.capacity().unwrap() {
461 match self.timer_ops.pop() {
462 Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
463 self.timers.insert((when, id), waker);
464 }
465 Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
466 self.after_timers.insert((when, id), waker);
467 }
468 Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
469 self.timers.remove(&(when, id));
470 }
471 Ok(TimerOp::Remove(when, TimerId::After(id))) => {
472 self.after_timers.remove(&(when, id));
473 }
474 Err(_) => break,
475 }
476 }
477 }
478
479 pub fn react(&mut self, now: Instant) -> io::Result<()> {
481 debug_assert!(self.wakers.is_empty());
482
483 self.process_timers(now);
485
486 let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
488
489 let res = self
491 .io_handler
492 .lock()
493 .unwrap()
494 .process(tick, &mut self.wakers);
495
496 if !self.wakers.is_empty() {
498 gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
499
500 for waker in self.wakers.drain(..) {
501 panic::catch_unwind(|| waker.wake()).ok();
503 }
504 }
505
506 res
507 }
508}
509
510#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
513pub struct RegularTimerId(usize);
514impl RegularTimerId {
515 const NONE: RegularTimerId = RegularTimerId(0);
516}
517
518#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
520pub struct AfterTimerId(usize);
521impl AfterTimerId {
522 const NONE: AfterTimerId = AfterTimerId(0);
523}
524
525#[derive(Copy, Clone, Debug)]
527pub(crate) enum TimerId {
528 Regular(RegularTimerId),
529 After(AfterTimerId),
530}
531
532impl From<RegularTimerId> for TimerId {
533 fn from(id: RegularTimerId) -> Self {
534 TimerId::Regular(id)
535 }
536}
537
538impl From<AfterTimerId> for TimerId {
539 fn from(id: AfterTimerId) -> Self {
540 TimerId::After(id)
541 }
542}
543
544enum TimerOp {
546 Insert(Instant, TimerId, Waker),
547 Remove(Instant, TimerId),
548}
549
550#[derive(Debug)]
552pub(super) struct Source {
553 pub(super) registration: Registration,
555
556 key: usize,
558
559 state: Mutex<[Direction; 2]>,
561}
562
563#[derive(Debug, Default)]
565struct Direction {
566 tick: usize,
568
569 ticks: Option<(usize, usize)>,
571
572 waker: Option<Waker>,
574
575 wakers: Slab<Option<Waker>>,
579}
580
581impl Direction {
582 fn is_empty(&self) -> bool {
584 self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
585 }
586
587 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
589 if let Some(w) = self.waker.take() {
590 dst.push(w);
591 }
592 for (_, opt) in self.wakers.iter_mut() {
593 if let Some(w) = opt.take() {
594 dst.push(w);
595 }
596 }
597 }
598}
599
600impl Source {
601 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603 self.poll_ready(READ, cx)
604 }
605
606 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
608 self.poll_ready(WRITE, cx)
609 }
610
611 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
615 let mut state = self.state.lock().unwrap();
616
617 if let Some((a, b)) = state[dir].ticks {
619 if state[dir].tick != a && state[dir].tick != b {
622 state[dir].ticks = None;
623 return Poll::Ready(Ok(()));
624 }
625 }
626
627 let was_empty = state[dir].is_empty();
628
629 if let Some(w) = state[dir].waker.take() {
631 if w.will_wake(cx.waker()) {
632 state[dir].waker = Some(w);
633 return Poll::Pending;
634 }
635 panic::catch_unwind(|| w.wake()).ok();
637 }
638
639 Reactor::with(|reactor| {
640 state[dir].waker = Some(cx.waker().clone());
641 state[dir].ticks = Some((reactor.ticker(), state[dir].tick));
642
643 if was_empty {
645 let event = {
646 let mut event = Event::none(self.key);
647 event.readable = !state[READ].is_empty();
648 event.writable = !state[WRITE].is_empty();
649 event
650 };
651
652 self.registration
654 .modify(&reactor.io_handler.lock().unwrap().poller, event)?;
655 }
656
657 Poll::Pending
658 })
659 }
660
661 pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
663 Readable(Self::ready(handle, READ))
664 }
665
666 pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
668 ReadableOwned(Self::ready(handle, READ))
669 }
670
671 pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
673 Writable(Self::ready(handle, WRITE))
674 }
675
676 pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
678 WritableOwned(Self::ready(handle, WRITE))
679 }
680
681 fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
683 Ready {
684 handle,
685 dir,
686 ticks: None,
687 index: None,
688 _guard: None,
689 }
690 }
691}
692
693#[must_use = "futures do nothing unless you `.await` or poll them"]
695pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
696
697impl<T: Send + 'static> Future for Readable<'_, T> {
698 type Output = io::Result<()>;
699
700 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
701 ready!(Pin::new(&mut self.0).poll(cx))?;
702 gst::trace!(
703 RUNTIME_CAT,
704 "readable: fd={:?}",
705 self.0.handle.source.registration
706 );
707 Poll::Ready(Ok(()))
708 }
709}
710
711impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
712 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
713 f.debug_struct("Readable").finish()
714 }
715}
716
717#[must_use = "futures do nothing unless you `.await` or poll them"]
719pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
720
721impl<T: Send + 'static> Future for ReadableOwned<T> {
722 type Output = io::Result<()>;
723
724 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
725 ready!(Pin::new(&mut self.0).poll(cx))?;
726 gst::trace!(
727 RUNTIME_CAT,
728 "readable_owned: fd={:?}",
729 self.0.handle.source.registration
730 );
731 Poll::Ready(Ok(()))
732 }
733}
734
735impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
736 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
737 f.debug_struct("ReadableOwned").finish()
738 }
739}
740
741#[must_use = "futures do nothing unless you `.await` or poll them"]
743pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
744
745impl<T: Send + 'static> Future for Writable<'_, T> {
746 type Output = io::Result<()>;
747
748 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
749 ready!(Pin::new(&mut self.0).poll(cx))?;
750 gst::trace!(
751 RUNTIME_CAT,
752 "writable: fd={:?}",
753 self.0.handle.source.registration
754 );
755 Poll::Ready(Ok(()))
756 }
757}
758
759impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761 f.debug_struct("Writable").finish()
762 }
763}
764
765#[must_use = "futures do nothing unless you `.await` or poll them"]
767pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
768
769impl<T: Send + 'static> Future for WritableOwned<T> {
770 type Output = io::Result<()>;
771
772 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
773 ready!(Pin::new(&mut self.0).poll(cx))?;
774 gst::trace!(
775 RUNTIME_CAT,
776 "writable_owned: fd={:?}",
777 self.0.handle.source.registration
778 );
779 Poll::Ready(Ok(()))
780 }
781}
782
783impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785 f.debug_struct("WritableOwned").finish()
786 }
787}
788
789struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
790 handle: H,
791 dir: usize,
792 ticks: Option<(usize, usize)>,
793 index: Option<usize>,
794 _guard: Option<RemoveOnDrop<H, T>>,
795}
796
797impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
798
799impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
800 type Output = io::Result<()>;
801
802 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
803 let &mut Self {
804 ref handle,
805 ref mut dir,
806 ref mut ticks,
807 ref mut index,
808 ref mut _guard,
809 ..
810 } = &mut *self;
811
812 let mut state = handle.borrow().source.state.lock().unwrap();
813
814 if let Some((a, b)) = *ticks {
816 if state[*dir].tick != a && state[*dir].tick != b {
819 return Poll::Ready(Ok(()));
820 }
821 }
822
823 let was_empty = state[*dir].is_empty();
824 Reactor::with(|reactor| {
825 let i = match *index {
827 Some(i) => i,
828 None => {
829 let i = state[*dir].wakers.insert(None);
830 *_guard = Some(RemoveOnDrop {
831 handle: handle.clone(),
832 dir: *dir,
833 key: i,
834 _marker: PhantomData,
835 });
836 *index = Some(i);
837 *ticks = Some((reactor.ticker(), state[*dir].tick));
838 i
839 }
840 };
841 state[*dir].wakers[i] = Some(cx.waker().clone());
842
843 if was_empty {
845 let event = {
847 let mut event = Event::none(handle.borrow().source.key);
848 event.readable = !state[READ].is_empty();
849 event.writable = !state[WRITE].is_empty();
850 event
851 };
852
853 handle
854 .borrow()
855 .source
856 .registration
857 .modify(&reactor.io_handler.lock().unwrap().poller, event)?;
858 }
859
860 Poll::Pending
861 })
862 }
863}
864
865struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
867 handle: H,
868 dir: usize,
869 key: usize,
870 _marker: PhantomData<fn() -> T>,
871}
872
873impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
874 fn drop(&mut self) {
875 let mut state = self.handle.borrow().source.state.lock().unwrap();
876 let wakers = &mut state[self.dir].wakers;
877 if wakers.contains(self.key) {
878 wakers.remove(self.key);
879 }
880 }
881}