use crate::Control;
use crate::prelude::ExecutionMode;
use crate::runtime::clock::CycleTime;
use crate::runtime::event_driver::YieldDriver;
use crate::runtime::event_driver::{EventDriver, IoDriver, IoSource, TimerDriver, TimerSource};
use crate::runtime::garbage_collector::GarbageCollector;
use crate::runtime::graph::Graph;
use crate::runtime::node::Node;
use crate::runtime::scheduler::{Scheduler, SchedulerError};
use crate::runtime::{Clock, EventDriverConfig, NodeHandle, Notifier};
use enum_as_inner::EnumAsInner;
use mio::Interest;
use mio::event::Source;
use petgraph::graph::NodeIndex;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io;
use std::time::{Duration, Instant};
const BUFFER_CAPACITY: usize = 32;
pub type SpawnFn = Box<dyn FnOnce(&mut Executor) + 'static>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
pub enum ExecutorState {
Running,
Terminated,
}
pub struct ExecutionContext<'a> {
event_driver: &'a mut EventDriver,
scheduler: &'a UnsafeCell<Scheduler>,
deferred_spawns: &'a mut VecDeque<SpawnFn>,
current: NodeIndex,
cycle_time: CycleTime,
epoch: usize,
}
impl<'a> ExecutionContext<'a> {
pub fn new(
event_driver: &'a mut EventDriver,
scheduler: &'a UnsafeCell<Scheduler>,
deferred_spawns: &'a mut VecDeque<SpawnFn>,
cycle_time: CycleTime,
epoch: usize,
) -> Self {
Self {
event_driver,
scheduler,
deferred_spawns,
current: NodeIndex::new(0),
cycle_time,
epoch,
}
}
pub const fn driver(&mut self) -> &mut EventDriver {
self.event_driver
}
#[inline(always)]
pub fn register_io<S: Source>(
&mut self,
source: S,
idx: NodeIndex,
interest: Interest,
) -> io::Result<IoSource<S>> {
self.event_driver
.io_driver()
.register_source(source, idx, interest)
}
#[inline(always)]
pub fn deregister_io<S: Source>(&mut self, source: IoSource<S>) -> io::Result<NodeIndex> {
self.event_driver.io_driver().deregister_source(source)
}
#[inline(always)]
pub fn reregister_io<S: Source>(
&mut self,
handle: &mut IoSource<S>,
interest: Interest,
) -> io::Result<()> {
self.event_driver
.io_driver()
.reregister_source(handle, interest)
}
#[inline(always)]
pub fn register_timer(&mut self, node_index: NodeIndex, when: Instant) -> TimerSource {
self.event_driver
.timer_driver()
.register_timer(node_index, when)
}
#[inline(always)]
pub fn deregister_timer(&mut self, source: TimerSource) {
self.event_driver.timer_driver().deregister_timer(source)
}
#[inline(always)]
pub fn yield_now(&mut self, node_index: NodeIndex) {
self.event_driver.yield_driver().yield_now(node_index)
}
pub const fn current(&self) -> NodeIndex {
self.current
}
pub const fn cycle_time(&self) -> &CycleTime {
&self.cycle_time
}
const fn set_current(&mut self, node_index: NodeIndex) {
self.current = node_index;
}
#[inline(always)]
pub fn schedule_node<T>(&mut self, node: &Node<T>) -> Result<(), SchedulerError> {
unsafe { (&mut *self.scheduler.get()).schedule(node.index(), node.depth()) }
}
pub const fn epoch(&self) -> usize {
self.epoch
}
#[inline(always)]
pub fn has_mutated<N: NodeHandle>(&self, parent: &N) -> bool {
parent.mut_epoch() == self.epoch
}
#[inline(always)]
pub fn spawn_subgraph<F>(&mut self, spawn_fn: F)
where
F: FnOnce(&mut Executor) + 'static,
{
self.deferred_spawns.push_back(Box::new(spawn_fn));
}
}
pub struct Executor {
graph: Graph,
scheduler: UnsafeCell<Scheduler>,
event_driver: EventDriver,
edge_buffer: Vec<NodeIndex>,
deferred_spawns: VecDeque<SpawnFn>,
gc: GarbageCollector,
epoch: usize,
}
impl Executor {
pub(crate) fn new(mode: ExecutionMode) -> Self {
Self {
graph: Graph::new(),
scheduler: UnsafeCell::new(Scheduler::new()),
event_driver: EventDriver::new(mode),
edge_buffer: Vec::with_capacity(BUFFER_CAPACITY),
deferred_spawns: VecDeque::new(),
gc: GarbageCollector::new(),
epoch: 0,
}
}
pub(crate) fn with_config(cfg: EventDriverConfig, mode: ExecutionMode) -> Self {
Self {
graph: Graph::new(),
scheduler: UnsafeCell::new(Scheduler::new()),
event_driver: EventDriver::with_config(cfg, mode),
edge_buffer: Vec::with_capacity(BUFFER_CAPACITY),
deferred_spawns: VecDeque::new(),
gc: GarbageCollector::new(),
epoch: 0,
}
}
#[inline(always)]
pub fn has_mutated<T>(&self, node: &Node<T>) -> bool {
node.mut_epoch() == self.epoch
}
pub const fn io_driver(&mut self) -> &mut IoDriver {
self.event_driver.io_driver()
}
pub const fn timer_driver(&mut self) -> &mut TimerDriver {
self.event_driver.timer_driver()
}
pub const fn yield_driver(&mut self) -> &mut YieldDriver {
self.event_driver.yield_driver()
}
#[inline(always)]
pub fn register_notifier(&self, node_index: NodeIndex) -> Notifier {
self.event_driver.register_notifier(node_index)
}
pub(crate) const fn graph(&mut self) -> &mut Graph {
&mut self.graph
}
pub(crate) const fn scheduler(&mut self) -> &mut Scheduler {
unsafe { &mut *self.scheduler.get() }
}
pub fn garbage_collector(&mut self) -> GarbageCollector {
self.gc.clone()
}
pub fn cycle(
&mut self,
clock: &mut impl Clock,
timeout: Option<Duration>,
) -> io::Result<ExecutorState> {
self.epoch = self.epoch.wrapping_add(1);
let cycle_time = self.event_driver.poll(
&mut self.graph,
unsafe { &mut *self.scheduler.get() },
clock,
timeout,
self.epoch,
)?;
let mut ctx = ExecutionContext::new(
&mut self.event_driver,
&self.scheduler,
&mut self.deferred_spawns,
cycle_time,
self.epoch,
);
while let Some(node_idx) = unsafe { (&mut *self.scheduler.get()).pop() } {
ctx.set_current(node_idx);
match self.graph.cycle(&mut ctx, node_idx) {
Control::Broadcast => {
self.edge_buffer
.extend(self.graph.triggering_edges(node_idx));
self.edge_buffer.drain(..).for_each(|child| {
self.graph
.can_schedule(child, self.epoch)
.map(|depth| unsafe {
(&mut *self.scheduler.get()).schedule(child, depth)
});
})
}
Control::Unchanged => {
}
Control::Sweep => {
self.gc.mark_for_sweep(node_idx);
self.edge_buffer.extend(self.graph.edges(node_idx));
self.edge_buffer.drain(..).for_each(|child| {
self.graph.mark_poisoned(child);
self.graph
.can_schedule(child, self.epoch)
.map(|depth| unsafe {
let _ = (&mut *self.scheduler.get()).schedule(child, depth);
});
});
}
Control::Terminate => {
return Ok(ExecutorState::Terminated); }
}
}
while let Some(marked_node) = self.gc.next_to_sweep() {
self.graph.remove_node(marked_node);
}
while let Some(spawn_fn) = self.deferred_spawns.pop_front() {
spawn_fn(self)
}
Ok(ExecutorState::Running)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Relationship;
use crate::runtime::clock::TestClock;
use crate::runtime::node::NodeBuilder;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
#[test]
fn test_executor_creation() {
let executor = Executor::new(ExecutionMode::Spin);
assert_eq!(executor.epoch, 0);
}
#[test]
fn test_basic_cycle_execution() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, ctx| {
*data += 1;
let current = ctx.current();
ctx.yield_now(current);
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 1);
assert_eq!(executor.epoch, 1);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 2);
assert_eq!(executor.epoch, 2);
}
#[test]
fn test_triggering_relationship() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let parent_node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, _ctx| {
*data += 1;
Control::Broadcast });
let child_node = NodeBuilder::new(0)
.add_relationship(&parent_node, Relationship::Trigger)
.build(&mut executor, |data, _ctx| {
*data += 1;
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*parent_node.borrow(), 1);
assert_eq!(*child_node.borrow(), 1);
}
#[test]
fn test_observe_relationship_does_not_trigger() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let parent_node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, _ctx| {
*data += 1;
Control::Broadcast });
let child_node = NodeBuilder::new(0)
.add_relationship(&parent_node, Relationship::Observe)
.build(&mut executor, |data, _ctx| {
*data += 1;
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*parent_node.borrow(), 1);
assert_eq!(*child_node.borrow(), 0); }
#[test]
fn test_notifier_handling() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let (node, notifier) = NodeBuilder::new(0)
.build_with_notifier(&mut executor, |data, _ctx| {
*data += 1;
Control::Unchanged
})
.unwrap();
notifier.notify();
executor
.cycle(&mut clock, Some(Duration::from_millis(10)))
.unwrap();
assert_eq!(*node.borrow(), 1);
let unknown_notifier = executor.register_notifier(NodeIndex::from(100));
unknown_notifier.notify();
let driver = &mut executor.event_driver;
let graph = &mut executor.graph;
let scheduler = unsafe { &mut *executor.scheduler.get() };
let epoch = executor.epoch + 1;
driver
.poll(graph, scheduler, &mut clock, None, epoch)
.unwrap();
let mut count = 0;
while let Some(_) = scheduler.pop() {
count += 1;
}
assert_eq!(count, 0);
}
#[test]
fn test_timer_event_handling() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, ctx| {
*data += 1;
if *data == 1 {
let timer_time = ctx.cycle_time().now() + Duration::from_millis(100);
let _timer_reg = ctx.register_timer(ctx.current(), timer_time);
}
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 1);
clock.advance(Duration::from_millis(150));
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 2);
}
#[test]
fn test_timer_not_expired_yet() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, ctx| {
*data += 1;
let next_time = ctx.cycle_time().now() + Duration::from_secs(3);
ctx.register_timer(ctx.current(), next_time);
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 1);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 1);
}
#[test]
fn test_multiple_cycles_increment_epoch() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
assert_eq!(executor.epoch, 0);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.epoch, 1);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.epoch, 2);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.epoch, 3);
}
#[test]
fn test_chain_of_triggering_nodes() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let call_order = Rc::new(RefCell::new(Vec::new()));
let call_order_1 = call_order.clone();
let node1 = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, move |data, _ctx| {
*data += 1;
call_order_1.borrow_mut().push(1);
Control::Broadcast });
let call_order_2 = call_order.clone();
let node2 = NodeBuilder::new(0)
.add_relationship(&node1, Relationship::Trigger)
.build(&mut executor, move |data, _ctx| {
*data += 1;
call_order_2.borrow_mut().push(2);
Control::Broadcast });
let call_order_3 = call_order.clone();
let _node3 = NodeBuilder::new(0)
.add_relationship(&node2, Relationship::Trigger)
.build(&mut executor, move |data, _ctx| {
*data += 1;
call_order_3.borrow_mut().push(3);
Control::Unchanged });
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
let order = call_order.borrow();
assert_eq!(*order, vec![1, 2, 3]);
}
#[test]
fn test_yield_driver_integration() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |data, _ctx| {
*data += 1;
Control::Unchanged
});
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(*node.borrow(), 1);
}
#[test]
fn test_termination_state() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let _node = NodeBuilder::new(0)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.build(&mut executor, |_, _ctx| Control::Terminate);
let result = executor.cycle(&mut clock, Some(Duration::ZERO));
assert!(result.is_ok());
let state = result.unwrap();
assert!(state.is_terminated());
}
#[test]
fn test_on_drop() {
let mut executor = Executor::new(ExecutionMode::Spin);
let flag = Rc::new(Cell::new(false));
let node = NodeBuilder::new(flag.clone())
.on_drop(|data| {
println!("setting flag to true");
data.set(true);
println!("flag is now {}", data.get());
})
.build(&mut executor, |_, _| Control::Unchanged);
let idx = node.index();
drop(node);
executor.graph.remove_node(idx);
println!("flag is {} after drop", flag.get());
assert!(flag.get());
}
#[test]
fn test_on_drop_executor_exit() {
let mut executor = Executor::new(ExecutionMode::Spin);
let flag = Rc::new(Cell::new(false));
let node = NodeBuilder::new(flag.clone())
.on_drop(|data| {
println!("setting flag to true");
data.set(true);
println!("flag is now {}", data.get());
})
.build(&mut executor, |_, _| Control::Unchanged);
drop(node);
drop(executor);
println!("flag is {} after drop", flag.get());
assert!(flag.get());
}
#[test]
fn test_garbage_collection() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let gc_count = Rc::new(Cell::new(0));
let node1 = NodeBuilder::new(gc_count.clone())
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.on_drop(|data| {
println!("removing node1");
data.update(|count| count + 1);
})
.build(&mut executor, |_, _| Control::Broadcast);
let node2 = NodeBuilder::new(gc_count.clone())
.triggered_by(&node1)
.on_drop(|data| {
println!("removing node2");
data.update(|count| count + 1);
})
.build(&mut executor, move |_, _| {
println!("node1 data: {}", node1.borrow().get());
Control::Broadcast
});
NodeBuilder::new(gc_count.clone())
.triggered_by(&node2)
.on_drop(|data| {
println!("removing node3");
data.update(|count| count + 1);
})
.spawn(&mut executor, move |_, _| {
println!("node2 data: {}", node2.borrow().get());
Control::Sweep
});
assert_eq!(executor.graph.node_count(), 3);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 0);
}
#[test]
fn test_node_spawn_with_cleanup() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let spawned = Rc::new(Cell::new(false));
let _root = NodeBuilder::new(spawned.clone())
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.on_drop(|_| {
println!("removing root");
})
.build(&mut executor, |spawned, ctx| {
let flag = spawned.clone();
ctx.spawn_subgraph(move |ex| {
NodeBuilder::new(flag)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.spawn(ex, |data, _| {
data.set(true);
Control::Sweep
})
});
Control::Broadcast
});
assert_eq!(executor.graph.node_count(), 1);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 2); assert!(!spawned.get());
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 1); assert!(spawned.get()); }
#[test]
fn test_node_spawn_with_cleanup_on_panic() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let spawned = Rc::new(Cell::new(false));
let _root = NodeBuilder::new(spawned.clone())
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.on_drop(|_| {
println!("removing root");
})
.build(&mut executor, |spawned, ctx| {
let flag = spawned.clone();
ctx.spawn_subgraph(move |ex| {
NodeBuilder::new(flag)
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.spawn(ex, |data, _| {
data.set(true);
panic!("panic!");
#[allow(unreachable_code)]
Control::Unchanged
})
});
Control::Broadcast
});
assert_eq!(executor.graph.node_count(), 1);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 2); assert!(!spawned.get());
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 1); assert!(spawned.get()); }
#[test]
fn test_garbage_collection_on_poisoned() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let gc_count = Rc::new(Cell::new(0));
let node0 = NodeBuilder::new(0)
.on_init(|ex, _, idx| {
ex.yield_driver().yield_now(idx);
})
.build(&mut executor, |_, _| Control::Broadcast);
let node1 = NodeBuilder::new(gc_count.clone())
.on_init(|executor, _, idx| {
executor.yield_driver().yield_now(idx);
})
.on_drop(|data| {
println!("removing node1");
data.update(|count| count + 1);
})
.build(&mut executor, |_, _| {
panic!("panic!");
#[allow(unreachable_code)]
Control::Broadcast
});
let node2 = NodeBuilder::new(gc_count.clone())
.triggered_by(&node0)
.observer_of(&node1)
.on_drop(|data| {
println!("removing node2");
data.update(|count| count + 1);
})
.build(&mut executor, move |_, _| {
println!("node1 data: {}", node1.borrow().get());
Control::Broadcast
});
NodeBuilder::new(gc_count.clone())
.observer_of(&node2)
.on_drop(|data| {
println!("removing node3");
data.update(|count| count + 1);
})
.spawn(&mut executor, move |_, _| {
println!("node2 data: {}", node2.borrow().get());
Control::Broadcast
});
assert_eq!(executor.graph.node_count(), 4);
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
assert_eq!(executor.graph.node_count(), 1);
}
#[test]
fn test_no_timeout() {
let mut executor = Executor::new(ExecutionMode::Spin);
let mut clock = TestClock::new();
let node = NodeBuilder::new(0)
.on_init(|ex, _, idx| ex.yield_driver().yield_now(idx))
.build(&mut executor, |this, _| {
*this += 1;
Control::Broadcast
});
executor.cycle(&mut clock, None).unwrap();
assert_eq!(executor.graph.node_count(), 1);
std::hint::black_box(node.borrow());
}
}