use std::collections::BinaryHeap;
use progress::Timestamp;
use progress::nested::{Source, Target};
use progress::ChangeBatch;
use progress::frontier::{Antichain, MutableAntichain};
use progress::timestamp::PathSummary;
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
pub edges: Vec<Vec<Vec<Target>>>,
pub shape: Vec<(usize, usize)>,
}
impl<T: Timestamp> Builder<T> {
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
edges: Vec::new(),
shape: Vec::new(),
}
}
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
self.shape[index] = (inputs, outputs);
}
pub fn add_edge(&mut self, source: Source, target: Target) {
debug_assert!(source.port < self.shape[source.index].1);
debug_assert!(target.port < self.shape[target.index].0);
self.edges[source.index][source.port].push(target);
}
pub fn build(&self) -> Tracker<T> {
Tracker::allocate_from(self)
}
}
#[derive(Debug)]
pub struct Tracker<T:Timestamp> {
sources: Vec<Vec<MutableAntichain<T>>>,
targets: Vec<Vec<MutableAntichain<T>>>,
pusheds: Vec<Vec<MutableAntichain<T>>>,
source_changes: ChangeBatch<(Source, T)>,
target_changes: ChangeBatch<(Target, T)>,
target_worklist: BinaryHeap<OrderReversed<(T, Target, i64)>>,
pushed_changes: ChangeBatch<(Target, T)>,
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: Vec<Vec<Vec<Target>>>,
compiled: Vec<Vec<Vec<(Target, T::Summary)>>>,
}
impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
self.target_changes.update((target, time), value);
}
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
self.source_changes.update((source, time), value);
}
pub fn allocate_from(builder: &Builder<T>) -> Self {
let mut sources = Vec::with_capacity(builder.shape.len());
let mut targets = Vec::with_capacity(builder.shape.len());
let mut pusheds = Vec::with_capacity(builder.shape.len());
let mut compiled = Vec::with_capacity(builder.shape.len());
for (node, &(inputs, outputs)) in builder.shape.iter().enumerate() {
sources.push(vec![MutableAntichain::new(); outputs]);
targets.push(vec![MutableAntichain::new(); inputs]);
pusheds.push(vec![MutableAntichain::new(); inputs]);
let mut compiled_node = vec![Vec::new(); inputs];
for input in 0 .. inputs {
for output in 0 .. outputs {
for summary in builder.nodes[node][input][output].elements().iter() {
for &target in builder.edges[node][output].iter() {
compiled_node[input].push((target, summary.clone()));
}
}
}
}
compiled.push(compiled_node);
}
Tracker {
sources,
targets,
pusheds,
source_changes: ChangeBatch::new(),
target_changes: ChangeBatch::new(),
target_worklist: BinaryHeap::new(),
pushed_changes: ChangeBatch::new(),
nodes: builder.nodes.clone(),
edges: builder.edges.clone(),
compiled,
}
}
pub fn propagate_all(&mut self) {
for ((target, time), diff) in self.target_changes.drain() {
let target_worklist = &mut self.target_worklist;
self.targets[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| {
target_worklist.push(OrderReversed::new((time.clone(), target, diff)))
})
}
for ((source, time), diff) in self.source_changes.drain() {
let target_worklist = &mut self.target_worklist;
let edges = &self.edges[source.index][source.port];
self.sources[source.index][source.port].update_iter_and(Some((time, diff)), |time, diff| {
for &target in edges.iter() {
target_worklist.push(OrderReversed::new((time.clone(), target, diff)))
}
})
}
while !self.target_worklist.is_empty() {
let (time, target, mut diff) = self.target_worklist.pop().unwrap().element;
while self.target_worklist.peek().map(|x| (x.element.0 == time) && (x.element.1 == target)).unwrap_or(false) {
diff += self.target_worklist.pop().unwrap().element.2;
}
if diff != 0 {
let pushed_changes = &mut self.pushed_changes;
let target_worklist = &mut self.target_worklist;
let _edges = &self.edges[target.index];
let _nodes = &self.nodes[target.index][target.port];
let _compiled = &self.compiled[target.index][target.port];
self.pusheds[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| {
pushed_changes.update((target, time.clone()), diff);
for &(new_target, ref summary) in _compiled.iter() {
if let Some(new_time) = summary.results_in(time) {
target_worklist.push(OrderReversed::new((new_time.clone(), new_target, diff)));
}
}
})
}
}
}
pub fn pushed(&mut self) -> &mut ChangeBatch<(Target, T)> {
&mut self.pushed_changes
}
}
#[derive(PartialEq, Eq, Debug)]
struct OrderReversed<T> {
pub element: T,
}
impl<T> OrderReversed<T> {
fn new(element: T) -> Self { OrderReversed { element } }
}
impl<T: PartialOrd> PartialOrd for OrderReversed<T> {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
other.element.partial_cmp(&self.element)
}
}
impl<T: Ord> Ord for OrderReversed<T> {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
other.element.cmp(&self.element)
}
}