mod io_driver;
mod timer_driver;
mod yield_driver;
use crate::prelude::ExecutionMode;
pub use crate::runtime::event_driver::io_driver::*;
pub use crate::runtime::event_driver::timer_driver::*;
pub use crate::runtime::event_driver::yield_driver::*;
use crate::runtime::graph::Graph;
use crate::runtime::scheduler::Scheduler;
use crate::runtime::{Clock, CycleTime};
use crossbeam_queue::ArrayQueue;
use derive_builder::Builder;
use petgraph::prelude::NodeIndex;
use std::io;
use std::sync::Arc;
use std::time::Duration;
const MINIMUM_TIMER_PRECISION: Duration = Duration::from_millis(1);
pub struct Notifier {
notifications: Arc<ArrayQueue<NodeIndex>>,
waker: Option<Arc<mio::Waker>>,
node_index: NodeIndex,
}
impl Clone for Notifier {
fn clone(&self) -> Self {
Self::new(
self.notifications.clone(),
self.waker.clone(),
self.node_index,
)
}
}
impl Notifier {
const fn new(
notifications: Arc<ArrayQueue<NodeIndex>>,
waker: Option<Arc<mio::Waker>>,
node_index: NodeIndex,
) -> Self {
Self {
notifications,
waker,
node_index,
}
}
#[inline(always)]
pub fn notify(&self) {
self.notifications.push(self.node_index).ok();
std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
self.waker.as_ref().map(|waker| waker.wake().ok());
}
}
#[derive(Builder)]
pub struct EventDriverConfig {
#[builder(default = true)]
pub io_enabled: bool,
#[builder(default = true)]
pub timer_enabled: bool,
#[builder(default = 256)]
pub notification_capacity: usize,
#[builder(default = 1024)]
pub io_capacity: usize,
#[builder(default = 16)]
pub poll_limit: usize,
}
pub struct EventDriver {
io_driver: Option<IoDriver>,
timer_driver: Option<TimerDriver>,
yield_driver: YieldDriver,
notifications: Arc<ArrayQueue<NodeIndex>>,
poll_limit: usize,
mode: ExecutionMode,
}
impl EventDriver {
pub(crate) fn new(mode: ExecutionMode) -> Self {
Self::with_config(
EventDriverConfigBuilder::default()
.build()
.expect("expected default builder"),
mode,
)
}
pub(crate) fn with_config(cfg: EventDriverConfig, mode: ExecutionMode) -> Self {
Self {
io_driver: if cfg.io_enabled {
Some(IoDriver::with_capacity(cfg.io_capacity))
} else {
None
},
timer_driver: if cfg.timer_enabled {
Some(TimerDriver::new())
} else {
None
},
yield_driver: YieldDriver::new(),
notifications: Arc::new(ArrayQueue::new(cfg.notification_capacity)),
poll_limit: cfg.poll_limit,
mode,
}
}
pub const fn io_driver(&mut self) -> &mut IoDriver {
self.io_driver.as_mut().expect("no io driver configured")
}
pub const fn timer_driver(&mut self) -> &mut TimerDriver {
self.timer_driver
.as_mut()
.expect("no timer driver configured")
}
pub const fn yield_driver(&mut self) -> &mut YieldDriver {
&mut self.yield_driver
}
#[inline(always)]
pub fn register_notifier(&self, node_index: NodeIndex) -> Notifier {
Notifier::new(
self.notifications.clone(),
self.io_driver.as_ref().map(|io| io.waker()),
node_index,
)
}
#[inline(always)]
pub(crate) fn poll(
&mut self,
graph: &mut Graph,
scheduler: &mut Scheduler,
clock: &mut impl Clock,
timeout: Option<Duration>,
epoch: usize,
) -> io::Result<CycleTime> {
if self.io_driver.is_none() {
std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
}
for _ in 0..self.poll_limit {
match self.notifications.pop() {
None => break,
Some(node_idx) => {
if let Some(depth) = graph.can_schedule(node_idx, epoch) {
scheduler
.schedule(node_idx, depth)
.expect("failed to schedule node");
}
}
}
}
let cycle_time = clock.cycle_time();
self.yield_driver.poll(graph, scheduler, epoch);
if let Some(ref mut driver) = self.timer_driver {
driver.poll(graph, scheduler, cycle_time.now(), epoch);
}
if scheduler.has_pending_event() || timeout == Some(Duration::ZERO) {
return self.fast_poll(graph, scheduler, cycle_time, epoch);
}
self.slow_poll(graph, scheduler, clock, cycle_time, timeout, epoch)
}
#[inline(always)]
fn fast_poll(
&mut self,
graph: &mut Graph,
scheduler: &mut Scheduler,
cycle_time: CycleTime,
epoch: usize,
) -> io::Result<CycleTime> {
if let Some(ref mut driver) = self.io_driver {
driver.poll(graph, scheduler, Some(Duration::ZERO), epoch)?;
}
Ok(cycle_time)
}
#[cold]
#[inline(never)]
fn slow_poll(
&mut self,
graph: &mut Graph,
scheduler: &mut Scheduler,
clock: &mut impl Clock,
cycle_time: CycleTime,
timeout: Option<Duration>,
epoch: usize,
) -> io::Result<CycleTime> {
let effective_timeout = match (&self.timer_driver, timeout) {
(Some(timer), Some(t)) => timer
.next_timer()
.map(|deadline| {
let d = deadline.saturating_duration_since(cycle_time.now());
t.min(d).max(MINIMUM_TIMER_PRECISION)
})
.or(Some(t)),
(Some(timer), None) => timer
.next_timer()
.map(|deadline| deadline.saturating_duration_since(cycle_time.now())),
(None, t) => t,
};
if let Some(ref mut driver) = self.io_driver {
driver.poll(graph, scheduler, effective_timeout, epoch)?;
if self.mode.is_park() && effective_timeout != Some(Duration::ZERO) {
Ok(clock.cycle_time())
} else {
Ok(cycle_time)
}
} else {
Ok(cycle_time)
}
}
}