use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;
use progress::Timestamp;
use progress::{Source, Target};
use progress::ChangeBatch;
use progress::{Location, Port};
use progress::frontier::{Antichain, MutableAntichain};
use progress::timestamp::PathSummary;
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
pub edges: Vec<Vec<Vec<Target>>>,
pub shape: Vec<(usize, usize)>,
}
impl<T: Timestamp> Builder<T> {
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
edges: Vec::new(),
shape: Vec::new(),
}
}
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
self.shape[index] = (inputs, outputs);
}
pub fn add_edge(&mut self, source: Source, target: Target) {
debug_assert!(source.port < self.shape[source.index].1);
debug_assert!(target.port < self.shape[target.index].0);
self.edges[source.index][source.port].push(target);
}
pub fn build(&self) -> Tracker<T> {
Tracker::allocate_from(self)
}
}
pub struct Tracker<T:Timestamp> {
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: Vec<Vec<Vec<Target>>>,
per_operator: Vec<PerOperator<T>>,
target_changes: ChangeBatch<(Target, T)>,
source_changes: ChangeBatch<(Source, T)>,
worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
pushed_changes: ChangeBatch<(Location, T)>,
output_changes: Vec<ChangeBatch<T>>,
}
struct PerOperator<T: Timestamp> {
targets: Vec<PortInformation<T>>,
sources: Vec<PortInformation<T>>,
}
impl<T: Timestamp> PerOperator<T> {
fn new(inputs: usize, outputs: usize) -> Self {
let mut targets = Vec::with_capacity(inputs);
for _input in 0 .. inputs {
targets.push(PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
output_summaries: Vec::new(),
})
}
let mut sources = Vec::with_capacity(outputs);
for _output in 0 .. outputs {
sources.push(PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
output_summaries: Vec::new(),
})
}
PerOperator { targets, sources }
}
}
struct PortInformation<T: Timestamp> {
pointstamps: MutableAntichain<T>,
implications: MutableAntichain<T>,
output_summaries: Vec<Antichain<T::Summary>>,
}
impl<T: Timestamp> PortInformation<T> {
#[inline(always)]
fn is_global(&self, time: &T) -> bool {
self.pointstamps.count_for(time) > 0 &&
self.implications.count_for(time) == 1
}
#[inline(always)]
fn update_into<I: IntoIterator<Item=(T, i64)>>(
&mut self,
iterator: I,
location: Location,
worklist: &mut BinaryHeap<Reverse<(T, Location, i64)>>,
output_changes: &mut Vec<ChangeBatch<T>>)
{
let summaries = &self.output_summaries;
self.pointstamps
.update_iter_and(iterator, |time, diff| {
worklist.push(Reverse((time.clone(), location, diff)));
for (output, summaries) in summaries.iter().enumerate() {
for out_time in summaries.elements().iter().flat_map(|summary| summary.results_in(time)) {
output_changes[output].update(out_time, diff);
}
}
});
}
}
impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update(&mut self, location: Location, time: T, value: i64) {
match location.port {
Port::Target(port) => self.update_target(Target { index: location.node, port }, time, value),
Port::Source(port) => self.update_source(Source { index: location.node, port }, time, value),
};
}
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
self.target_changes.update((target, time), value);
}
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
self.source_changes.update((source, time), value);
}
pub fn allocate_from(builder: &Builder<T>) -> Self {
let mut per_operator =
builder
.shape
.iter()
.map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
.collect::<Vec<_>>();
let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
for (location, summaries) in output_summaries.into_iter() {
match location.port {
Port::Target(port) => {
per_operator[location.node].targets[port].output_summaries = summaries;
},
Port::Source(port) => {
per_operator[location.node].sources[port].output_summaries = summaries;
},
}
}
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];
Tracker {
nodes: builder.nodes.clone(),
edges: builder.edges.clone(),
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
worklist: BinaryHeap::new(),
pushed_changes: ChangeBatch::new(),
output_changes,
}
}
pub fn propagate_all(&mut self) {
let output_changes = &mut self.output_changes;
for ((target, time), diff) in self.target_changes.drain() {
let worklist = &mut self.worklist;
self.per_operator[target.index]
.targets[target.port]
.update_into(Some((time, diff)), Location::from(target), worklist, output_changes);
}
for ((source, time), diff) in self.source_changes.drain() {
let worklist = &mut self.worklist;
self.per_operator[source.index]
.sources[source.port]
.update_into(Some((time, diff)), Location::from(source), worklist, output_changes);
}
while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
diff += (self.worklist.pop().unwrap().0).2;
}
if diff != 0 {
let pushed_changes = &mut self.pushed_changes;
let worklist = &mut self.worklist;
match location.port {
Port::Target(port_index) => {
let nodes = &self.nodes[location.node][port_index];
self.per_operator[location.node]
.targets[port_index]
.implications
.update_iter_and(Some((time, diff)), |time, diff| {
pushed_changes.update((location, time.clone()), diff);
for (output_port, summaries) in nodes.iter().enumerate() {
for summary in summaries.elements().iter() {
if let Some(new_time) = summary.results_in(time) {
worklist.push(Reverse((
new_time,
Location {
node: location.node,
port: Port::Source(output_port)
},
diff
)));
}
}
}
});
}
Port::Source(port_index) => {
let edges = &self.edges[location.node][port_index];
self.per_operator[location.node]
.sources[port_index]
.implications
.update_iter_and(Some((time, diff)), |time, diff| {
pushed_changes.update((location, time.clone()), diff);
for &new_target in edges.iter() {
worklist.push(Reverse((
time.clone(),
Location::from(new_target),
diff
)));
}
});
},
};
}
}
}
pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
&mut self.pushed_changes
}
pub fn is_global(&self, location: Location, time: &T) -> bool {
match location.port {
Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
}
}
}
fn summarize_outputs<T: Timestamp>(
nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: &Vec<Vec<Vec<Target>>>,
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
{
let mut reverse = HashMap::new();
for (node, outputs) in edges.iter().enumerate() {
for (output, targets) in outputs.iter().enumerate() {
for target in targets.iter() {
reverse.insert(
Location::from(*target),
Location { node, port: Port::Source(output) }
);
}
}
}
let mut results = HashMap::new();
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
let outputs =
edges
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
.filter(|target| target.index == 0);
for output_target in outputs {
worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
}
while let Some((location, output, summary)) = worklist.pop_front() {
match location.port {
Port::Source(output_port) => {
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
let location = Location { node: location.node, port: Port::Target(input_port) };
let mut antichains = results.entry(location).or_insert(Vec::new());
while antichains.len() <= output { antichains.push(Antichain::new()); }
for operator_summary in summaries[output_port].elements().iter() {
if let Some(combined) = operator_summary.followed_by(&summary) {
if antichains[output].insert(combined.clone()) {
worklist.push_back((location, output, combined));
}
}
}
}
},
Port::Target(_port) => {
if let Some(source) = reverse.get(&location) {
let mut antichains = results.entry(*source).or_insert(Vec::new());
while antichains.len() <= output { antichains.push(Antichain::new()); }
if antichains[output].insert(summary.clone()) {
worklist.push_back((*source, output, summary.clone()));
}
}
},
}
}
results.retain(|key,_val| key.node != 0);
results
}