use concurrent_queue::ConcurrentQueue;
use futures::ready;
use polling::{Event, Poller};
use slab::Slab;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use crate::runtime::{Async, RUNTIME_CAT};
const READ: usize = 0;
const WRITE: usize = 1;
thread_local! {
static CURRENT_REACTOR: RefCell<Option<Reactor>> = RefCell::new(None);
}
#[derive(Debug)]
pub(super) struct Reactor {
poller: Poller,
ticker: AtomicUsize,
timers_check_instant: Instant,
time_slice_end: Instant,
half_max_throttling: Duration,
wakers: Vec<Waker>,
sources: Slab<Arc<Source>>,
events: Vec<Event>,
timers: BTreeMap<(Instant, RegularTimerId), Waker>,
after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
fn new(max_throttling: Duration) -> Self {
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
timers_check_instant: Instant::now(),
time_slice_end: Instant::now(),
half_max_throttling: max_throttling / 2,
wakers: Vec::new(),
sources: Slab::new(),
events: Vec::new(),
timers: BTreeMap::new(),
after_timers: BTreeMap::new(),
timer_ops: ConcurrentQueue::bounded(1000),
}
}
pub fn init(max_throttling: Duration) {
CURRENT_REACTOR.with(|cur| {
let mut cur = cur.borrow_mut();
if cur.is_none() {
*cur = Some(Reactor::new(max_throttling));
}
})
}
pub fn clear() {
let _ = CURRENT_REACTOR.try_with(|cur_reactor| {
cur_reactor.borrow_mut().as_mut().map(|reactor| {
reactor.ticker = AtomicUsize::new(0);
reactor.wakers.clear();
reactor.sources.clear();
reactor.events.clear();
reactor.timers.clear();
reactor.after_timers.clear();
while !reactor.timer_ops.is_empty() {
let _ = reactor.timer_ops.pop();
}
})
});
}
#[track_caller]
pub fn with<F, R>(f: F) -> R
where
F: FnOnce(&Reactor) -> R,
{
CURRENT_REACTOR.with(|reactor| {
f(reactor
.borrow()
.as_ref()
.expect("Not running in a Context."))
})
}
#[track_caller]
pub fn with_mut<F, R>(f: F) -> R
where
F: FnOnce(&mut Reactor) -> R,
{
CURRENT_REACTOR.with(|reactor| {
f(reactor
.borrow_mut()
.as_mut()
.expect("Not running in a Context."))
})
}
pub fn ticker(&self) -> usize {
self.ticker.load(Ordering::SeqCst)
}
pub fn half_max_throttling(&self) -> Duration {
self.half_max_throttling
}
pub fn timers_check_instant(&self) -> Instant {
self.timers_check_instant
}
pub fn time_slice_end(&self) -> Instant {
self.time_slice_end
}
pub fn insert_io(
&mut self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let source = {
let key = self.sources.vacant_entry().key();
let source = Arc::new(Source {
raw,
key,
state: Default::default(),
});
self.sources.insert(source.clone());
source
};
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
gst::error!(
crate::runtime::RUNTIME_CAT,
"Failed to register fd {}: {}",
source.raw,
err,
);
self.sources.remove(source.key);
return Err(err);
}
Ok(source)
}
pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
self.sources.remove(source.key);
self.poller.delete(source.raw)
}
pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
while self
.timer_ops
.push(TimerOp::Insert(when, id.into(), waker.clone()))
.is_err()
{
gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
self.process_timer_ops();
}
id
}
pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
while self
.timer_ops
.push(TimerOp::Insert(when, id.into(), waker.clone()))
.is_err()
{
gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
self.process_timer_ops();
}
id
}
pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
let id = id.into();
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
self.process_timer_ops();
}
}
fn process_timers(&mut self, now: Instant) {
self.process_timer_ops();
self.timers_check_instant = now;
self.time_slice_end = now + self.half_max_throttling;
let pending = self
.timers
.split_off(&(self.time_slice_end, RegularTimerId::NONE));
let ready = mem::replace(&mut self.timers, pending);
if !ready.is_empty() {
gst::trace!(
RUNTIME_CAT,
"process_timers (regular): {} ready wakers",
ready.len()
);
for (_, waker) in ready {
self.wakers.push(waker);
}
}
let pending = self
.after_timers
.split_off(&(self.timers_check_instant, AfterTimerId::NONE));
let ready = mem::replace(&mut self.after_timers, pending);
if !ready.is_empty() {
gst::trace!(
RUNTIME_CAT,
"process_timers (after): {} ready wakers",
ready.len()
);
for (_, waker) in ready {
self.wakers.push(waker);
}
}
}
fn process_timer_ops(&mut self) {
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
self.timers.insert((when, id), waker);
}
Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
self.after_timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
self.timers.remove(&(when, id));
}
Ok(TimerOp::Remove(when, TimerId::After(id))) => {
self.after_timers.remove(&(when, id));
}
Err(_) => break,
}
}
}
pub fn react(&mut self, now: Instant) -> io::Result<()> {
debug_assert!(self.wakers.is_empty());
self.process_timers(now);
let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
self.events.clear();
let res = match self.poller.wait(&mut self.events, Some(Duration::ZERO)) {
Ok(0) => Ok(()),
Ok(_) => {
for ev in self.events.iter() {
if let Some(source) = self.sources.get(ev.key) {
let mut state = source.state.lock().unwrap();
for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
if emitted {
state[dir].tick = tick;
state[dir].drain_into(&mut self.wakers);
}
}
if !state[READ].is_empty() || !state[WRITE].is_empty() {
self.poller.modify(
source.raw,
Event {
key: source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
}
}
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
Err(err) => Err(err),
};
if !self.wakers.is_empty() {
gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
for waker in self.wakers.drain(..) {
panic::catch_unwind(|| waker.wake()).ok();
}
}
res
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct RegularTimerId(usize);
impl RegularTimerId {
const NONE: RegularTimerId = RegularTimerId(0);
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AfterTimerId(usize);
impl AfterTimerId {
const NONE: AfterTimerId = AfterTimerId(0);
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum TimerId {
Regular(RegularTimerId),
After(AfterTimerId),
}
impl From<RegularTimerId> for TimerId {
fn from(id: RegularTimerId) -> Self {
TimerId::Regular(id)
}
}
impl From<AfterTimerId> for TimerId {
fn from(id: AfterTimerId) -> Self {
TimerId::After(id)
}
}
enum TimerOp {
Insert(Instant, TimerId, Waker),
Remove(Instant, TimerId),
}
#[derive(Debug)]
pub(super) struct Source {
#[cfg(unix)]
pub(super) raw: RawFd,
#[cfg(windows)]
pub(super) raw: RawSocket,
key: usize,
state: Mutex<[Direction; 2]>,
}
#[derive(Debug, Default)]
struct Direction {
tick: usize,
ticks: Option<(usize, usize)>,
waker: Option<Waker>,
wakers: Slab<Option<Waker>>,
}
impl Direction {
fn is_empty(&self) -> bool {
self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
}
fn drain_into(&mut self, dst: &mut Vec<Waker>) {
if let Some(w) = self.waker.take() {
dst.push(w);
}
for (_, opt) in self.wakers.iter_mut() {
if let Some(w) = opt.take() {
dst.push(w);
}
}
}
}
impl Source {
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(READ, cx)
}
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(WRITE, cx)
}
fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut state = self.state.lock().unwrap();
if let Some((a, b)) = state[dir].ticks {
if state[dir].tick != a && state[dir].tick != b {
state[dir].ticks = None;
return Poll::Ready(Ok(()));
}
}
let was_empty = state[dir].is_empty();
if let Some(w) = state[dir].waker.take() {
if w.will_wake(cx.waker()) {
state[dir].waker = Some(w);
return Poll::Pending;
}
panic::catch_unwind(|| w.wake()).ok();
}
Reactor::with(|reactor| {
state[dir].waker = Some(cx.waker().clone());
state[dir].ticks = Some((reactor.ticker(), state[dir].tick));
if was_empty {
reactor.poller.modify(
self.raw,
Event {
key: self.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
Poll::Pending
})
}
pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
Readable(Self::ready(handle, READ))
}
pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
ReadableOwned(Self::ready(handle, READ))
}
pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
Writable(Self::ready(handle, WRITE))
}
pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
WritableOwned(Self::ready(handle, WRITE))
}
fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
Ready {
handle,
dir,
ticks: None,
index: None,
_guard: None,
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
impl<T: Send + 'static> Future for Readable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
gst::trace!(RUNTIME_CAT, "readable: fd={}", self.0.handle.source.raw);
Poll::Ready(Ok(()))
}
}
impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Readable").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
impl<T: Send + 'static> Future for ReadableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
gst::trace!(
RUNTIME_CAT,
"readable_owned: fd={}",
self.0.handle.source.raw
);
Poll::Ready(Ok(()))
}
}
impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadableOwned").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
impl<T: Send + 'static> Future for Writable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
gst::trace!(RUNTIME_CAT, "writable: fd={}", self.0.handle.source.raw);
Poll::Ready(Ok(()))
}
}
impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Writable").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
impl<T: Send + 'static> Future for WritableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
gst::trace!(
RUNTIME_CAT,
"writable_owned: fd={}",
self.0.handle.source.raw
);
Poll::Ready(Ok(()))
}
}
impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WritableOwned").finish()
}
}
struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
handle: H,
dir: usize,
ticks: Option<(usize, usize)>,
index: Option<usize>,
_guard: Option<RemoveOnDrop<H, T>>,
}
impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
ref handle,
dir,
ticks,
index,
_guard,
..
} = &mut *self;
let mut state = handle.borrow().source.state.lock().unwrap();
if let Some((a, b)) = *ticks {
if state[*dir].tick != a && state[*dir].tick != b {
return Poll::Ready(Ok(()));
}
}
let was_empty = state[*dir].is_empty();
Reactor::with(|reactor| {
let i = match *index {
Some(i) => i,
None => {
let i = state[*dir].wakers.insert(None);
*_guard = Some(RemoveOnDrop {
handle: handle.clone(),
dir: *dir,
key: i,
_marker: PhantomData,
});
*index = Some(i);
*ticks = Some((reactor.ticker(), state[*dir].tick));
i
}
};
state[*dir].wakers[i] = Some(cx.waker().clone());
if was_empty {
reactor.poller.modify(
handle.borrow().source.raw,
Event {
key: handle.borrow().source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
Poll::Pending
})
}
}
struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
handle: H,
dir: usize,
key: usize,
_marker: PhantomData<fn() -> T>,
}
impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
fn drop(&mut self) {
let mut state = self.handle.borrow().source.state.lock().unwrap();
let wakers = &mut state[self.dir].wakers;
if wakers.contains(self.key) {
wakers.remove(self.key);
}
}
}