use ahash::HashMap;
use slab::Slab;
use std::collections::VecDeque;
use zrx_graph::Graph;
use super::action::output::OutputItem;
use super::action::{Action, Error, Input, Outputs, Result};
use super::effect::{Item, Signal};
use super::executor::graph::{Frontier, Node};
use super::id::Id;
use super::value::{self, Value};
use crate::action::descriptor::{Interest, Property};
pub mod graph;
pub mod queue;
pub use queue::{ToReceiver, Token};
#[derive(Debug)]
pub struct Executor<I>
where
I: Id,
{
graph: Graph<Box<dyn Action<I>>>,
queues: Vec<VecDeque<usize>>,
frontiers: Slab<Entry<I>>,
interests: HashMap<Interest, Vec<usize>>,
concurrency: Vec<usize>,
}
#[derive(Debug)]
struct Entry<I> {
id: I,
frontier: Option<Frontier>,
}
impl<I> Executor<I>
where
I: Id,
{
pub fn new(graph: Graph<Box<dyn Action<I>>>) -> Self {
let mut interests = HashMap::default();
for n in &graph {
for interest in graph[n].descriptor().interests() {
interests.entry(*interest).or_insert_with(Vec::new).push(n);
}
}
let len = graph.len();
Self {
queues: vec![VecDeque::new(); graph.len()],
graph,
frontiers: Slab::default(),
interests,
concurrency: vec![0; len],
}
}
pub fn submit<T>(&mut self, item: OutputItem<I>, initial: T)
where
T: IntoIterator<Item = usize>,
{
let initial = initial.into_iter().collect::<Vec<_>>();
let mut frontier =
Frontier::new(self.graph.topology(), initial.clone());
let n = frontier.take().unwrap();
frontier.complete(Node { id: n, data: item.data }).unwrap();
let id = item.id;
let f = self.frontiers.insert(Entry {
id: id.clone(),
frontier: Some(frontier),
});
self.do_take(Token { frontier: f, node: n });
if let Some(indices) = self.interests.get(&Interest::Submit) {
for &node in indices {
let action = &mut self.graph[node];
let _ = action.execute(Input::Signal(Signal::Submit(&id)));
}
}
}
pub fn update(&mut self, token: Token, items: Vec<OutputItem<I>>) {
let mut f = token.frontier;
let mut completed = false;
self.concurrency[token.node] -= 1;
for item in items {
let mut n = token.node;
if !self.frontiers.contains(f) {
let mut frontier =
Frontier::new(self.graph.topology(), [token.node]);
n = frontier.take().unwrap();
f = self.frontiers.insert(Entry {
id: item.id.clone(),
frontier: Some(frontier),
});
}
let entry = &mut self.frontiers[f];
let f = if entry.id == item.id {
completed = true;
f
} else {
let mut frontier =
Frontier::new(self.graph.topology(), [token.node]);
n = frontier.take().unwrap();
self.frontiers.insert(Entry {
id: item.id.clone(),
frontier: Some(frontier),
})
};
self.do_complete(Token { frontier: f, node: n }, item.data);
}
if !completed {
self.do_complete(Token { frontier: f, node: token.node }, None);
}
}
pub fn take(&mut self) -> Vec<Job<I>> {
let mut opt = None;
let mut max = 8;
for (n, queue) in self.queues.iter_mut().enumerate() {
let value = self.graph[n]
.descriptor()
.properties()
.iter()
.find_map(|p| {
if let Property::Concurrency(val) = p {
Some(*val)
} else {
None
}
})
.unwrap_or(8);
if !queue.is_empty() && self.concurrency[n] < value {
opt = Some(n);
max = value;
break;
}
}
let Some(n) = opt else { return Vec::new() };
let mut results = Vec::new();
while let Some(f) = self.queues[n].pop_front() {
let entry = &mut self.frontiers[f];
let Some(frontier) = &mut entry.frontier else {
continue;
};
let values = frontier.select(n);
let id = values.id;
let input = Input::Item(Item::new(&entry.id, values.data));
let (data, res) = match self.graph[n].execute(input) {
Err(Error::Value(value::Error::Presence)) => (Some(None), None),
res => (None, Some(res)),
};
if let Some(data) = data {
self.do_complete(Token { frontier: f, node: n }, data);
continue;
}
self.concurrency[id] += 1;
results.push((Token { frontier: f, node: id }, res.unwrap()));
max -= 1;
if max == 0 {
break;
}
}
results
}
fn do_complete(&mut self, token: Token, data: Option<Box<dyn Value>>) {
let f = token.frontier;
let mut n = token.node;
let entry = &mut self.frontiers[f];
let id = entry.id.clone();
let mut to_add = Vec::new();
if let Some(frontier) = &mut entry.frontier {
if let Err(data) = frontier.complete(Node { id: n, data }) {
let mut frontier =
Frontier::new(self.graph.topology(), [token.node]);
n = frontier.take().unwrap();
frontier.complete(Node { id: n, data }).unwrap();
to_add.push(frontier);
if self.graph[n]
.descriptor()
.properties()
.contains(&Property::Flush)
{
entry.frontier = None;
}
}
self.do_take(token);
}
if !to_add.is_empty() {
let frontier = to_add.pop().unwrap();
let f = self.frontiers.insert(Entry {
id: id.clone(),
frontier: Some(frontier),
});
self.do_take(Token { frontier: f, node: n });
}
}
fn do_take(&mut self, token: Token) {
let f = token.frontier;
let entry = &mut self.frontiers[f];
if let Some(frontier) = &mut entry.frontier {
if frontier.is_empty() {
self.frontiers.remove(f);
} else {
while let Some(node) = frontier.take() {
self.queues[node].push_back(f);
}
}
}
}
}
#[allow(clippy::must_use_candidate)]
impl<I> Executor<I>
where
I: Id,
{
#[inline]
pub fn len(&self) -> usize {
self.frontiers.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.frontiers.is_empty()
|| self.frontiers.iter().all(|(_, x)| x.frontier.is_none())
}
pub fn can_make_progress(&self) -> bool {
let mut len_of_all = 0;
for x in &self.queues {
len_of_all += x.len();
}
len_of_all > 0
}
}
pub type Job<I> = (Token, Result<Outputs<I>>);