use progress::Timestamp;
use progress::nested::{Source, Target};
use progress::ChangeBatch;
use progress::frontier::{Antichain, MutableAntichain};
use progress::timestamp::PathSummary;
use order::PartialOrder;
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: Vec<Vec<Vec<Target>>>,
shape: Vec<(usize, usize)>,
}
impl<T: Timestamp> Builder<T> {
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
edges: Vec::new(),
shape: Vec::new(),
}
}
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
self.shape[index] = (inputs, outputs);
}
pub fn add_edge(&mut self, source: Source, target: Target) {
debug_assert!(source.port < self.shape[source.index].1);
debug_assert!(target.port < self.shape[target.index].0);
self.edges[source.index][source.port].push(target);
}
pub fn summarize(&mut self) -> Summary<T> {
let mut work = ::std::collections::VecDeque::<((Source, Target), T::Summary)>::new();
for index in 0 .. self.edges.len() {
for port in 0 .. self.edges[index].len() {
for &target in &self.edges[index][port] {
work.push_back(((Source { index: index, port: port}, target), Default::default()));
}
}
}
let mut source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>> = Vec::new();
let mut target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>> = Vec::new();
for &(inputs, outputs) in self.shape.iter() {
source_target.push(vec![Vec::new(); outputs]);
target_target.push(vec![Vec::new(); inputs]);
}
while let Some(((source, target), summary)) = work.pop_front() {
if add_summary(&mut source_target[source.index][source.port], target, summary.clone()) {
for (new_source_port, internal_summaries) in self.nodes[target.index][target.port].iter().enumerate() {
for internal_summary in internal_summaries.elements() {
if let Some(new_summary) = summary.followed_by(internal_summary) {
for &new_target in self.edges[target.index][new_source_port].iter() {
work.push_back(((source, new_target), new_summary.clone()));
}
}
}
}
}
}
for index in 0 .. self.nodes.len() {
for input_port in 0 .. self.nodes[index].len() {
for (output_port, internal_summaries) in self.nodes[index][input_port].iter().enumerate() {
for internal_summary in internal_summaries.elements() {
for &(target, ref new_summaries) in source_target[index][output_port].iter() {
for new_summary in new_summaries.elements() {
if let Some(summary) = internal_summary.followed_by(new_summary) {
add_summary(&mut target_target[index][input_port], target, summary);
}
}
}
}
}
}
}
#[cfg(debug_assertions)]
{
for node in 0 .. target_target.len() {
for port in 0 .. target_target[node].len() {
let this_target = Target { index: node, port: port };
for &(ref target, ref summary) in target_target[node][port].iter() {
if target == &this_target && summary.less_equal(&Default::default()) {
panic!("Default summary found along self-loop: {:?}", target);
}
}
}
}
}
for index in 0 .. self.nodes.len() {
for input_port in 0 .. self.nodes[index].len() {
add_summary(
&mut target_target[index][input_port],
Target { index: index, port: input_port },
Default::default(),
);
}
}
Summary {
source_target,
target_target,
}
}
}
#[derive(Clone)]
pub struct Summary<T: Timestamp> {
pub source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>,
pub target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>,
}
#[derive(Default, Debug)]
pub struct Tracker<T:Timestamp> {
sources: Vec<Vec<MutableAntichain<T>>>,
targets: Vec<Vec<MutableAntichain<T>>>,
pusheds: Vec<Vec<ChangeBatch<T>>>,
source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>,
target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>,
}
impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
self.targets[target.index][target.port].update_dirty(time, value);
}
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
self.sources[source.index][source.port].update_dirty(time, value);
}
pub fn node_state(&mut self, index: usize) -> (&[MutableAntichain<T>], &[MutableAntichain<T>], &mut [ChangeBatch<T>]) {
(&self.targets[index], &self.sources[index], &mut self.pusheds[index][..])
}
pub fn target(&mut self, index: usize) -> &mut [MutableAntichain<T>] {
&mut self.targets[index]
}
pub fn source(&mut self, index: usize) -> &mut [MutableAntichain<T>] {
&mut self.sources[index]
}
pub fn clear(&mut self) {
for vec in &mut self.sources { for map in vec.iter_mut() { map.clear(); } }
for vec in &mut self.targets { for map in vec.iter_mut() { map.clear(); } }
for vec in &mut self.pusheds { for map in vec.iter_mut() { map.clear(); } }
}
pub fn is_empty(&mut self) -> bool {
self.pusheds.iter_mut().all(|x| x.iter_mut().all(|y| y.is_empty()))
}
pub fn tracking_anything(&mut self) -> bool {
self.sources.iter_mut().any(|x| x.iter_mut().any(|y| !y.is_empty())) ||
self.targets.iter_mut().any(|x| x.iter_mut().any(|y| !y.is_empty()))
}
pub fn allocate_from(summary: Summary<T>) -> Self {
let source_target = summary.source_target;
let target_target = summary.target_target;
debug_assert_eq!(source_target.len(), target_target.len());
let mut sources = Vec::with_capacity(source_target.len());
let mut targets = Vec::with_capacity(target_target.len());
let mut pusheds = Vec::with_capacity(target_target.len());
for index in 0 .. source_target.len() {
let source_count = source_target[index].len();
sources.push(vec![MutableAntichain::new(); source_count]);
}
for index in 0 .. target_target.len() {
let target_count = target_target[index].len();
targets.push(vec![MutableAntichain::new(); target_count]);
pusheds.push(vec![ChangeBatch::new(); target_count]);
}
Tracker {
sources,
targets,
pusheds,
source_target,
target_target,
}
}
pub fn propagate_node(&mut self, index: usize) {
for input in 0..self.targets[index].len() {
let target_target = &self.target_target[index][input];
let pusheds = &mut self.pusheds;
self.targets[index][input].update_iter_and(None, |time, value| {
for &(target, ref antichain) in target_target.iter() {
let pusheds = &mut pusheds[target.index][target.port];
for summary in antichain.elements().iter() {
if let Some(new_time) = summary.results_in(&time) {
pusheds.update(new_time, value);
}
}
}
});
}
for output in 0..self.sources[index].len() {
let source_target = &self.source_target[index][output];
let pusheds = &mut self.pusheds;
self.sources[index][output].update_iter_and(None, |time, value| {
for &(target, ref antichain) in source_target.iter() {
let pusheds = &mut pusheds[target.index][target.port];
for summary in antichain.elements().iter() {
if let Some(new_time) = summary.results_in(&time) {
pusheds.update(new_time, value);
}
}
}
});
}
}
pub fn propagate_all(&mut self) {
for index in 0..self.targets.len() {
self.propagate_node(index);
}
}
#[inline(always)]
pub fn pushed_mut(&mut self, node: usize) -> &mut [ChangeBatch<T>] {
&mut self.pusheds[node][..]
}
}
fn add_summary<S: PartialOrder+Eq>(vector: &mut Vec<(Target, Antichain<S>)>, target: Target, summary: S) -> bool {
for &mut (ref t, ref mut antichain) in vector.iter_mut() {
if target.eq(t) { return antichain.insert(summary); }
}
vector.push((target, Antichain::from_elem(summary)));
true
}