use slab::Slab;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::ops::{Index, IndexMut};
use zrx_store::StoreMutRef;
use crate::scheduler::action::graph::Node;
use crate::scheduler::action::Result;
use crate::scheduler::engine::queue::Token;
use crate::scheduler::schedule::Schedule;
use crate::scheduler::signal::Id;
use crate::scheduler::step::effect::Then;
use crate::scheduler::step::{Scope, Steps};
#[derive(Debug)]
pub struct Actions<I> {
schedules: Slab<Schedule<I>>,
dependencies: BTreeMap<Token, Vec<Token>>,
queue: VecDeque<Token>,
}
impl<I> Actions<I>
where
I: Id,
{
pub fn new() -> Self {
Self {
schedules: Slab::new(),
dependencies: BTreeMap::new(),
queue: VecDeque::new(),
}
}
pub fn attach(&mut self, schedule: Schedule<I>) -> usize {
let m = self.schedules.insert(schedule);
self.recompute();
m
}
pub fn detach(&mut self, index: usize) -> Option<Schedule<I>> {
self.schedules.try_remove(index).inspect(|_| {
self.recompute();
})
}
pub fn submit(&mut self, token: Token) {
if !self.queue.contains(&token) {
self.queue.push_back(token);
}
}
pub fn take(&mut self) -> Option<Item<I>> {
self.queue.pop_front().map(|token| {
let schedule = &mut self.schedules[token.module];
(token, schedule.take(token.node))
})
}
pub fn ensure(&mut self, token: Token, scope: &mut Scope<I>) -> Scope<I> {
let schedule = &mut self.schedules[token.module];
match scope.id() {
Some(_) => scope.take(),
None => Scope::new(
scope.key().clone(),
schedule.submit(token.node, scope.key()),
),
}
}
#[inline]
pub fn resume(&mut self, token: Token, then: Then<I>) -> Item<I> {
let schedule = &mut self.schedules[token.module];
(
token,
vec![then
.execute(schedule.context(token.node).build([]).unwrap())
.map_err(Into::into)],
)
}
pub fn complete(&mut self, token: Token, scope: &Scope<I>) {
let schedule = &mut self.schedules[token.module];
for node in schedule.complete(token.node, scope) {
let token = Token { module: token.module, node };
if !self.queue.contains(&token) {
self.queue.push_back(token);
}
}
let prior = token;
if let Some(deps) = self.dependencies.get(&prior) {
for token in deps {
let (prev, next) = self
.schedules
.get2_mut(prior.module, token.module)
.expect("invariant");
let graph = &mut next.graph;
let storages = &prev.storages;
let source = graph[token.node].as_source_mut().unwrap();
source.sender().forward(&storages[prior.node], scope.key());
if !self.queue.contains(token) {
self.queue.push_back(*token);
}
}
}
}
fn recompute(&mut self) {
self.dependencies.clear();
let mut sources = vec![BTreeMap::new(); self.schedules.len()];
let mut targets = vec![BTreeMap::new(); self.schedules.len()];
for (m, schedule) in &self.schedules {
targets[m] = schedule.graph.group_sinks(Node::descriptor).collect();
sources[m] = schedule.graph.sources().fold(
BTreeMap::<_, Vec<usize>>::new(),
|mut map, n| {
map.entry(schedule.graph[n].descriptor())
.or_default()
.push(n);
map
},
);
}
for (consumer, _) in &self.schedules {
for (descriptor, nodes) in &sources[consumer] {
for (producer, _) in &self.schedules {
if producer == consumer {
continue;
}
if sources[producer].contains_key(descriptor) {
continue;
}
let Some(exit_nodes) = targets[producer].get(descriptor)
else {
continue;
};
for &exit in exit_nodes {
for &entry in nodes {
self.dependencies
.get_or_insert_default(&Token {
module: producer,
node: exit,
})
.push(Token { module: consumer, node: entry });
}
}
}
}
}
}
}
#[allow(clippy::must_use_candidate)]
impl<I> Actions<I> {
#[inline]
pub fn len(&self) -> usize {
self.queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
impl<I> Index<usize> for Actions<I> {
type Output = Schedule<I>;
#[inline]
fn index(&self, token: usize) -> &Self::Output {
&self.schedules[token]
}
}
impl<I> IndexMut<usize> for Actions<I> {
#[inline]
fn index_mut(&mut self, token: usize) -> &mut Self::Output {
&mut self.schedules[token]
}
}
impl<I> Default for Actions<I>
where
I: Id,
{
#[inline]
fn default() -> Self {
Self::new()
}
}
pub type Item<I> = (Token, Vec<Result<Steps<I>>>);