use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;
use crate::progress::Timestamp;
use crate::progress::{Source, Target};
use crate::progress::ChangeBatch;
use crate::progress::{Location, Port};
use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::progress::timestamp::PathSummary;
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
pub edges: Vec<Vec<Vec<Target>>>,
pub shape: Vec<(usize, usize)>,
}
impl<T: Timestamp> Builder<T> {
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
edges: Vec::new(),
shape: Vec::new(),
}
}
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
self.shape[index] = (inputs, outputs);
}
pub fn add_edge(&mut self, source: Source, target: Target) {
debug_assert!(source.port < self.shape[source.index].1);
debug_assert!(target.port < self.shape[target.index].0);
self.edges[source.index][source.port].push(target);
}
pub fn build(&self) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
if !self.is_acyclic() {
println!("Cycle detected without timestamp increment");
println!("{:?}", self);
}
Tracker::allocate_from(self)
}
pub fn is_acyclic(&self) -> bool {
let mut in_degree = HashMap::new();
let mut out_edges = HashMap::new();
for (index, ports) in self.edges.iter().enumerate() {
for (output, targets) in ports.iter().enumerate() {
let source = Location::new_source(index, output);
for &target in targets.iter() {
let target = Location::from(target);
*in_degree.entry(target).or_insert(0) += 1;
out_edges.entry(source).or_insert(Vec::new()).push(target);
}
}
}
for (index, summary) in self.nodes.iter().enumerate() {
for (input, outputs) in summary.iter().enumerate() {
let target = Location::new_target(index, input);
for (output, summaries) in outputs.iter().enumerate() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
out_edges.entry(target).or_insert(Vec::new()).push(source);
}
}
}
}
}
let mut worklist = Vec::new();
for (key, _) in out_edges.iter() {
if !in_degree.contains_key(key) {
worklist.push(*key);
}
}
while let Some(node) = worklist.pop() {
if let Some(edges) = out_edges.remove(&node) {
for dest in edges.into_iter() {
*in_degree.get_mut(&dest).unwrap() -= 1;
if in_degree[&dest] == 0 {
in_degree.remove(&dest);
worklist.push(dest);
}
}
}
}
in_degree.is_empty() && out_edges.is_empty()
}
}
pub struct Tracker<T:Timestamp> {
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: Vec<Vec<Vec<Target>>>,
per_operator: Vec<PerOperator<T>>,
target_changes: ChangeBatch<(Target, T)>,
source_changes: ChangeBatch<(Source, T)>,
worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
pushed_changes: ChangeBatch<(Location, T)>,
output_changes: Vec<ChangeBatch<T>>,
total_counts: i64,
}
pub struct PerOperator<T: Timestamp> {
pub targets: Vec<PortInformation<T>>,
pub sources: Vec<PortInformation<T>>,
}
impl<T: Timestamp> PerOperator<T> {
fn new(inputs: usize, outputs: usize) -> Self {
PerOperator {
targets: vec![PortInformation::new(); inputs],
sources: vec![PortInformation::new(); outputs],
}
}
}
#[derive(Clone)]
pub struct PortInformation<T: Timestamp> {
pub pointstamps: MutableAntichain<T>,
pub implications: MutableAntichain<T>,
pub output_summaries: Vec<Antichain<T::Summary>>,
}
impl<T: Timestamp> PortInformation<T> {
fn new() -> Self {
PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
output_summaries: Vec::new(),
}
}
#[inline(always)]
fn is_global(&self, time: &T) -> bool {
let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
let redundant = self.implications.count_for(time) > 1;
!dominated && !redundant
}
}
impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update(&mut self, location: Location, time: T, value: i64) {
match location.port {
Port::Target(port) => self.update_target(Target { index: location.node, port }, time, value),
Port::Source(port) => self.update_source(Source { index: location.node, port }, time, value),
};
}
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
self.target_changes.update((target, time), value);
}
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
self.source_changes.update((source, time), value);
}
pub fn tracking_anything(&mut self) -> bool {
!self.source_changes.is_empty() ||
!self.target_changes.is_empty() ||
self.total_counts > 0
}
pub fn allocate_from(builder: &Builder<T>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
let mut per_operator =
builder
.shape
.iter()
.map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
.collect::<Vec<_>>();
let mut builder_summary = vec![vec![]; builder.shape[0].1];
let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
for (location, summaries) in output_summaries.into_iter() {
if location.node == 0 {
if let Port::Source(port) = location.port {
builder_summary[port] = summaries;
}
else {
}
}
else {
match location.port {
Port::Target(port) => {
per_operator[location.node].targets[port].output_summaries = summaries;
},
Port::Source(port) => {
per_operator[location.node].sources[port].output_summaries = summaries;
},
}
}
}
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];
let tracker =
Tracker {
nodes: builder.nodes.clone(),
edges: builder.edges.clone(),
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
worklist: BinaryHeap::new(),
pushed_changes: ChangeBatch::new(),
output_changes,
total_counts: 0,
};
(tracker, builder_summary)
}
pub fn propagate_all(&mut self) {
for ((target, time), diff) in self.target_changes.drain() {
let operator = &mut self.per_operator[target.index].targets[target.port];
let changes = operator.pointstamps.update_iter(Some((time, diff)));
for (time, diff) in changes {
self.total_counts += diff;
for (output, summaries) in operator.output_summaries.iter().enumerate() {
let output_changes = &mut self.output_changes[output];
summaries
.elements()
.iter()
.flat_map(|summary| summary.results_in(&time))
.for_each(|out_time| output_changes.update(out_time, diff));
}
self.worklist.push(Reverse((time, Location::from(target), diff)));
}
}
for ((source, time), diff) in self.source_changes.drain() {
let operator = &mut self.per_operator[source.index].sources[source.port];
let changes = operator.pointstamps.update_iter(Some((time, diff)));
for (time, diff) in changes {
self.total_counts += diff;
for (output, summaries) in operator.output_summaries.iter().enumerate() {
let output_changes = &mut self.output_changes[output];
summaries
.elements()
.iter()
.flat_map(|summary| summary.results_in(&time))
.for_each(|out_time| output_changes.update(out_time, diff));
}
self.worklist.push(Reverse((time, Location::from(source), diff)));
}
}
while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
diff += (self.worklist.pop().unwrap().0).2;
}
if diff != 0 {
match location.port {
Port::Target(port_index) => {
let changes =
self.per_operator[location.node]
.targets[port_index]
.implications
.update_iter(Some((time, diff)));
for (time, diff) in changes {
let nodes = &self.nodes[location.node][port_index];
for (output_port, summaries) in nodes.iter().enumerate() {
let source = Location { node: location.node, port: Port::Source(output_port) };
for summary in summaries.elements().iter() {
if let Some(new_time) = summary.results_in(&time) {
self.worklist.push(Reverse((new_time, source, diff)));
}
}
}
self.pushed_changes.update((location, time), diff);
}
}
Port::Source(port_index) => {
let changes =
self.per_operator[location.node]
.sources[port_index]
.implications
.update_iter(Some((time, diff)));
for (time, diff) in changes {
for new_target in self.edges[location.node][port_index].iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
diff,
)));
}
self.pushed_changes.update((location, time), diff);
}
},
};
}
}
}
pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
&mut self.output_changes[..]
}
pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
&mut self.pushed_changes
}
pub fn node_state(&self, index: usize) -> &PerOperator<T> {
&self.per_operator[index]
}
pub fn is_global(&self, location: Location, time: &T) -> bool {
match location.port {
Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
}
}
}
fn summarize_outputs<T: Timestamp>(
nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
edges: &Vec<Vec<Vec<Target>>>,
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
{
let mut reverse = HashMap::new();
for (node, outputs) in edges.iter().enumerate() {
for (output, targets) in outputs.iter().enumerate() {
for target in targets.iter() {
reverse.insert(
Location::from(*target),
Location { node, port: Port::Source(output) }
);
}
}
}
let mut results = HashMap::new();
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
let outputs =
edges
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
.filter(|target| target.index == 0);
for output_target in outputs {
worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
}
while let Some((location, output, summary)) = worklist.pop_front() {
match location.port {
Port::Source(output_port) => {
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
let location = Location { node: location.node, port: Port::Target(input_port) };
let antichains = results.entry(location).or_insert(Vec::new());
while antichains.len() <= output { antichains.push(Antichain::new()); }
for operator_summary in summaries[output_port].elements().iter() {
if let Some(combined) = operator_summary.followed_by(&summary) {
if antichains[output].insert(combined.clone()) {
worklist.push_back((location, output, combined));
}
}
}
}
},
Port::Target(_port) => {
if let Some(source) = reverse.get(&location) {
let antichains = results.entry(*source).or_insert(Vec::new());
while antichains.len() <= output { antichains.push(Antichain::new()); }
if antichains[output].insert(summary.clone()) {
worklist.push_back((*source, output, summary.clone()));
}
}
},
}
}
results
}