use slab::Slab;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::ops::{Index, IndexMut};
use zrx_graph::Graph;
use zrx_store::StoreMutRef;
use crate::scheduler::action::Result;
use crate::scheduler::engine::queue::Token;
use crate::scheduler::schedule::Schedule;
use crate::scheduler::signal::Id;
use crate::scheduler::step::effect::Then;
use crate::scheduler::step::{Scoped, Steps};
#[derive(Debug)]
pub struct Actions<I> {
schedules: Slab<Schedule<I>>,
dependencies: BTreeMap<Token, Vec<Token>>,
queue: VecDeque<Token>,
}
impl<I> Actions<I>
where
I: Id,
{
pub fn new() -> Self {
Self {
schedules: Slab::new(),
dependencies: BTreeMap::new(),
queue: VecDeque::new(),
}
}
pub fn attach(&mut self, schedule: Schedule<I>) -> usize {
let m = self.schedules.insert(schedule);
self.recompute();
m
}
pub fn detach(&mut self, index: usize) -> Option<Schedule<I>> {
self.schedules.try_remove(index).inspect(|_| {
self.recompute();
})
}
pub fn submit(&mut self, token: Token) {
if !self.queue.contains(&token) {
self.queue.push_back(token);
}
}
pub fn take(&mut self) -> Option<Item<I>> {
self.queue.pop_front().map(|token| {
let schedule = &mut self.schedules[token.module];
(token, schedule.take(token.node))
})
}
pub fn ensure(&mut self, token: Token, scoped: &Scoped<I>) -> Scoped<I> {
let schedule = &mut self.schedules[token.module];
match scoped.id() {
Some(_) => scoped.clone(),
None => Scoped::new(
(**scoped).clone(),
schedule.submit(token.node, scoped),
),
}
}
#[inline]
pub fn resume(&mut self, token: Token, then: Then<I>) -> Item<I> {
let schedule = &mut self.schedules[token.module];
(
token,
vec![then
.execute(schedule.context(token.node).build([]).unwrap())
.map_err(Into::into)],
)
}
pub fn complete(&mut self, token: Token, scope: &Scoped<I>) {
let schedule = &mut self.schedules[token.module];
for node in schedule.complete(token.node, scope) {
let token = Token { module: token.module, node };
if !self.queue.contains(&token) {
self.queue.push_back(token);
}
}
let prior = token;
if let Some(deps) = self.dependencies.get(&prior) {
for token in deps {
let (prev, next) = self
.schedules
.get2_mut(prior.module, token.module)
.expect("invariant");
let graph = &mut next.graph;
let storages = &prev.storages;
let source = graph[token.node].as_source_mut().unwrap();
source.sender().forward(&storages[prior.node], scope);
if !self.queue.contains(token) {
self.queue.push_back(*token);
}
}
}
}
fn recompute(&mut self) {
self.dependencies.clear();
let mut modules = Graph::builder();
for (m, _) in &self.schedules {
modules.add_node(m);
let g = &self.schedules[m].graph;
for n in &modules {
if m == n {
continue;
}
let h = &self.schedules[n].graph;
for source in g.sources() {
for target in h {
if source == target || h.is_source(target) {
continue;
}
if g[source].descriptor() != h[target].descriptor() {
continue;
}
self.dependencies
.get_or_insert_default(&Token {
module: n,
node: target,
})
.push(Token { module: m, node: source });
}
}
for source in h.sources() {
for target in g {
if source == target || g.is_source(target) {
continue;
}
if h[source].descriptor() != g[target].descriptor() {
continue;
}
self.dependencies
.get_or_insert_default(&Token {
module: m,
node: target,
})
.push(Token { module: n, node: source });
}
}
}
}
}
}
#[allow(clippy::must_use_candidate)]
impl<I> Actions<I> {
#[inline]
pub fn len(&self) -> usize {
self.queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
impl<I> Index<usize> for Actions<I> {
type Output = Schedule<I>;
#[inline]
fn index(&self, token: usize) -> &Self::Output {
&self.schedules[token]
}
}
impl<I> IndexMut<usize> for Actions<I> {
#[inline]
fn index_mut(&mut self, token: usize) -> &mut Self::Output {
&mut self.schedules[token]
}
}
impl<I> Default for Actions<I>
where
I: Id,
{
#[inline]
fn default() -> Self {
Self::new()
}
}
pub type Item<I> = (Token, Vec<Result<Steps<I>>>);