use std::collections::VecDeque;
use zrx_storage::Storages;
use crate::action::options::{Event, Interest};
use crate::scheduler::action::graph::{self, Graph};
use crate::scheduler::action::{context, Context, Result};
use crate::scheduler::signal::{Id, Scope};
use crate::scheduler::step::{Scoped, Steps};
pub mod builder;
mod frontier;
mod shared;
pub use builder::{Builder, Subscriber};
use frontier::{Frontier, Frontiers};
pub use shared::Shared;
#[derive(Debug)]
pub struct Schedule<I> {
pub graph: Graph<I>,
pub storages: Storages,
frontiers: Frontiers<I>,
queues: Vec<VecDeque<usize>>,
}
impl<I> Schedule<I>
where
I: Id,
{
#[allow(clippy::missing_panics_doc)]
pub fn submit(&mut self, node: usize, scope: &Scope<I>) -> usize {
let traversal = self.graph.traverse([node]);
let frontier = Frontier::new(scope.clone(), traversal);
let f = self.frontiers.insert(frontier).expect("invariant");
for n in &self.graph {
let options = self.graph[n].options();
if options.interests.contains(&Interest::Enter) {
let (action, adj) = self.graph.adjacent_mut(n);
let (inputs, output) = self.storages.views(adj.incoming, n);
let ctx = Context::builder()
.inputs(inputs)
.output(output)
.events(vec![Event::Insert(scope.clone())])
.build([])
.expect("invariant");
action.execute(ctx);
}
}
let frontier = &mut self.frontiers[f];
frontier.take().expect("invariant");
f
}
#[allow(clippy::missing_panics_doc)]
pub fn take(&mut self, node: usize) -> Vec<Result<Steps<I>>> {
let scopes: Vec<_> = self.queues[node]
.drain(..)
.map(|f| Scoped::new(self.frontiers[f].scope().clone(), f))
.collect();
let (action, adj) = self.graph.adjacent_mut(node);
let (inputs, output) = self.storages.views(adj.incoming, node);
let ctx = Context::builder()
.inputs(inputs)
.output(output)
.build(scopes)
.expect("invariant");
action.execute(ctx).collect()
}
#[allow(clippy::missing_panics_doc)]
pub fn complete(&mut self, node: usize, scoped: &Scoped<I>) -> Vec<usize> {
let f = scoped.id().expect("invariant");
let res = self.frontiers[f].complete(node);
if res.is_ok() && self.frontiers[f].is_empty() {
self.frontiers.remove(f);
return vec![];
}
let mut nodes = vec![];
while let Some(next) = self.frontiers[f].take() {
if self.queues[next].is_empty() {
nodes.push(next);
}
if !self.queues[next].contains(&f) {
self.queues[next].push_back(f);
}
}
nodes
}
pub fn context(&mut self, node: usize) -> context::Builder<'_, I> {
let (_, adj) = self.graph.adjacent_mut(node);
let (inputs, output) = self.storages.views(adj.incoming, node);
Context::builder().inputs(inputs).output(output)
}
}
#[allow(clippy::must_use_candidate)]
impl<I> Schedule<I> {
#[inline]
pub fn len(&self) -> usize {
self.frontiers.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.frontiers.is_empty()
}
}
impl<I> Default for Schedule<I> {
#[inline]
fn default() -> Self {
Self::builder().build()
}
}