use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;
use columnar::{Vecs, Index as ColumnarIndex};
use crate::progress::Timestamp;
use crate::progress::{Source, Target};
use crate::progress::ChangeBatch;
use crate::progress::{Location, Port};
use crate::progress::operate::{Connectivity, PortConnectivity};
use crate::progress::frontier::MutableAntichain;
use crate::progress::timestamp::PathSummary;
fn build_nested_vecs<S>(nodes: impl Iterator<Item = impl Iterator<Item = impl Iterator<Item = S>>>) -> Vecs<Vecs<Vec<S>>> {
let mut result: Vecs<Vecs<Vec<S>>> = Default::default();
for node in nodes {
for port in node {
result.values.push_iter(port);
}
result.bounds.push(result.values.bounds.len() as u64);
}
result
}
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
pub nodes: Vec<Connectivity<T::Summary>>,
pub edges: Vec<Vec<Vec<Target>>>,
pub shape: Vec<(usize, usize)>,
}
impl<T: Timestamp> Builder<T> {
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
edges: Vec::new(),
shape: Vec::new(),
}
}
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
debug_assert_eq!(inputs, summary.len());
debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
self.shape[index] = (inputs, outputs);
}
pub fn add_edge(&mut self, source: Source, target: Target) {
debug_assert!(source.port < self.shape[source.node].1);
debug_assert!(target.port < self.shape[target.node].0);
self.edges[source.node][source.port].push(target);
}
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
if !self.is_acyclic() {
println!("Cycle detected without timestamp increment");
println!("{:?}", self);
}
Tracker::allocate_from(self, logger)
}
pub fn is_acyclic(&self) -> bool {
let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
let mut in_degree = HashMap::with_capacity(locations);
for (index, ports) in self.edges.iter().enumerate() {
for (output, targets) in ports.iter().enumerate() {
let source = Location::new_source(index, output);
in_degree.entry(source).or_insert(0);
for &target in targets.iter() {
let target = Location::from(target);
*in_degree.entry(target).or_insert(0) += 1;
}
}
}
for (index, summary) in self.nodes.iter().enumerate() {
for (input, outputs) in summary.iter().enumerate() {
let target = Location::new_target(index, input);
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter_ports() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
}
}
}
}
let mut worklist = Vec::with_capacity(in_degree.len());
for (key, val) in in_degree.iter() {
if *val == 0 {
worklist.push(*key);
}
}
in_degree.retain(|_key, val| val != &0);
while let Some(Location { node, port }) = worklist.pop() {
match port {
Port::Source(port) => {
for target in self.edges[node][port].iter() {
let target = Location::from(*target);
*in_degree.get_mut(&target).unwrap() -= 1;
if in_degree[&target] == 0 {
in_degree.remove(&target);
worklist.push(target);
}
}
},
Port::Target(port) => {
for (output, summaries) in self.nodes[node][port].iter_ports() {
let source = Location::new_source(node, output);
for summary in summaries.elements().iter() {
if summary == &Default::default() {
*in_degree.get_mut(&source).unwrap() -= 1;
if in_degree[&source] == 0 {
in_degree.remove(&source);
worklist.push(source);
}
}
}
}
},
}
}
in_degree.is_empty()
}
}
impl<T: Timestamp> Default for Builder<T> {
fn default() -> Self {
Self::new()
}
}
pub struct Tracker<T:Timestamp> {
nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
edges: Vecs<Vecs<Vec<Target>>>,
target_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
source_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
per_operator: Vec<PerOperator<T>>,
target_changes: ChangeBatch<(Target, T)>,
source_changes: ChangeBatch<(Source, T)>,
worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
pushed_changes: ChangeBatch<(Location, T)>,
output_changes: Vec<ChangeBatch<T>>,
total_counts: i64,
logger: Option<logging::TrackerLogger<T>>,
}
pub struct PerOperator<T: Timestamp> {
pub targets: Vec<PortInformation<T>>,
pub sources: Vec<PortInformation<T>>,
pub cap_counts: i64,
}
impl<T: Timestamp> PerOperator<T> {
pub fn new(inputs: usize, outputs: usize) -> Self {
PerOperator {
targets: vec![PortInformation::new(); inputs],
sources: vec![PortInformation::new(); outputs],
cap_counts: 0,
}
}
}
#[derive(Clone)]
pub struct PortInformation<T: Timestamp> {
pub pointstamps: MutableAntichain<T>,
pub implications: MutableAntichain<T>,
}
impl<T: Timestamp> PortInformation<T> {
pub fn new() -> Self {
PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
}
}
#[inline]
pub fn is_global(&self, time: &T) -> bool {
let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
let redundant = self.implications.count_for(time) > 1;
!dominated && !redundant
}
}
impl<T: Timestamp> Default for PortInformation<T> {
fn default() -> Self {
Self::new()
}
}
impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update(&mut self, location: Location, time: T, value: i64) {
match location.port {
Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
};
}
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
self.target_changes.update((target, time), value);
}
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
self.source_changes.update((source, time), value);
}
pub fn tracking_anything(&mut self) -> bool {
!self.source_changes.is_empty() ||
!self.target_changes.is_empty() ||
self.total_counts > 0
}
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
let per_operator =
builder
.shape
.iter()
.map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
.collect::<Vec<_>>();
let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
let mut target_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
.map(|&(inputs, _)| vec![PortConnectivity::default(); inputs])
.collect();
let mut source_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
.map(|&(_, outputs)| vec![PortConnectivity::default(); outputs])
.collect();
for (location, summaries) in output_summaries.into_iter() {
if location.node == 0 {
if let Port::Source(port) = location.port {
builder_summary[port] = summaries;
}
else {
}
}
else {
match location.port {
Port::Target(port) => {
target_sum[location.node][port] = summaries;
},
Port::Source(port) => {
source_sum[location.node][port] = summaries;
},
}
}
}
let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| {
connectivity.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));
let edges = build_nested_vecs(builder.edges.iter().map(|node_edges| {
node_edges.iter().map(|port_edges| port_edges.iter().cloned())
}));
let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| {
ports.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));
let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| {
ports.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];
let tracker =
Tracker {
nodes,
edges,
target_summaries,
source_summaries,
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
worklist: BinaryHeap::new(),
pushed_changes: ChangeBatch::new(),
output_changes,
total_counts: 0,
logger,
};
(tracker, builder_summary)
}
pub fn propagate_all(&mut self) {
if let Some(logger) = &mut self.logger {
let target_changes =
self.target_changes
.iter()
.map(|((target, time), diff)| (target.node, target.port, time, *diff));
logger.log_target_updates(target_changes);
let source_changes =
self.source_changes
.iter()
.map(|((source, time), diff)| (source.node, source.port, time, *diff));
logger.log_source_updates(source_changes);
}
use itertools::Itertools;
let mut target_changes = self.target_changes.drain().peekable();
while let Some(((target, _), _)) = target_changes.peek() {
let target = *target;
let operator = &mut self.per_operator[target.node].targets[target.port];
let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff));
let changes = operator.pointstamps.update_iter(target_updates);
for (time, diff) in changes {
self.total_counts += diff;
for &(output, ref summary) in (&self.target_summaries).get(target.node).get(target.port).into_index_iter() {
if let Some(out_time) = summary.results_in(&time) {
self.output_changes[output].update(out_time, diff);
}
}
self.worklist.push(Reverse((time, Location::from(target), diff)));
}
}
let mut source_changes = self.source_changes.drain().peekable();
while let Some(((source, _), _)) = source_changes.peek() {
let source = *source;
let operator = &mut self.per_operator[source.node];
let op_source = &mut operator.sources[source.port];
let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
let changes = op_source.pointstamps.update_iter(source_updates);
for (time, diff) in changes {
self.total_counts += diff;
operator.cap_counts += diff;
for &(output, ref summary) in (&self.source_summaries).get(source.node).get(source.port).into_index_iter() {
if let Some(out_time) = summary.results_in(&time) {
self.output_changes[output].update(out_time, diff);
}
}
self.worklist.push(Reverse((time, Location::from(source), diff)));
}
}
while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
diff += (self.worklist.pop().unwrap().0).2;
}
if diff != 0 {
match location.port {
Port::Target(port_index) => {
let changes =
self.per_operator[location.node]
.targets[port_index]
.implications
.update_iter(Some((time, diff)));
for (time, diff) in changes {
for &(output_port, ref summary) in (&self.nodes).get(location.node).get(port_index).into_index_iter() {
if let Some(new_time) = summary.results_in(&time) {
let source = Location { node: location.node, port: Port::Source(output_port) };
self.worklist.push(Reverse((new_time, source, diff)));
}
}
self.pushed_changes.update((location, time), diff);
}
}
Port::Source(port_index) => {
let changes =
self.per_operator[location.node]
.sources[port_index]
.implications
.update_iter(Some((time, diff)));
for (time, diff) in changes {
for new_target in (&self.edges).get(location.node).get(port_index).into_index_iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
diff,
)));
}
self.pushed_changes.update((location, time), diff);
}
},
};
}
}
}
pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
&mut self.output_changes[..]
}
pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator<T>]) {
(&mut self.pushed_changes, &self.per_operator)
}
pub fn node_state(&self, index: usize) -> &PerOperator<T> {
&self.per_operator[index]
}
pub fn is_global(&self, location: Location, time: &T) -> bool {
match location.port {
Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
}
}
}
fn summarize_outputs<T: Timestamp>(
nodes: &[Connectivity<T::Summary>],
edges: &[Vec<Vec<Target>>],
) -> HashMap<Location, PortConnectivity<T::Summary>>
{
let mut reverse = HashMap::new();
for (node, outputs) in edges.iter().enumerate() {
for (output, targets) in outputs.iter().enumerate() {
for target in targets.iter() {
reverse.insert(
Location::from(*target),
Location { node, port: Port::Source(output) }
);
}
}
}
let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
for (node, connectivity) in nodes.iter().enumerate() {
for (input, outputs) in connectivity.iter().enumerate() {
for (output, summary) in outputs.iter_ports() {
reverse_internal
.entry(Location::new_source(node, output))
.or_default()
.push((input, summary));
}
}
}
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
let outputs =
edges
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
.filter(|target| target.node == 0);
for output_target in outputs {
worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
}
while let Some((location, output, summary)) = worklist.pop_front() {
match location.port {
Port::Source(_output_port) => {
if let Some(inputs) = reverse_internal.get(&location) {
for (input_port, operator_summary) in inputs.iter() {
let new_location = Location::new_target(location.node, *input_port);
for op_summary in operator_summary.elements().iter() {
if let Some(combined) = op_summary.followed_by(&summary) {
if results.entry(new_location).or_default().insert_ref(output, &combined) {
worklist.push_back((new_location, output, combined));
}
}
}
}
}
}
Port::Target(_port) => {
if let Some(&source) = reverse.get(&location) {
if results.entry(source).or_default().insert_ref(output, &summary) {
worklist.push_back((source, output, summary));
}
}
},
}
}
results
}
pub mod logging {
use std::time::Duration;
use timely_container::CapacityContainerBuilder;
use timely_logging::TypedLogger;
use crate::logging_core::Logger;
pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
pub struct TrackerLogger<T: Clone + 'static> {
identifier: usize,
logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
}
impl<T: Clone + 'static> TrackerLogger<T> {
pub fn new(identifier: usize, logger: Logger<TrackerEventBuilder<T>>) -> Self {
Self { identifier, logger: logger.into() }
}
pub fn log_source_updates<'a, I>(&mut self, updates: I)
where
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
{
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
if !updates.is_empty() {
self.logger.log({
SourceUpdate {
tracker_id: self.identifier,
updates
}
});
}
}
pub fn log_target_updates<'a, I>(&mut self, updates: I)
where
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
{
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
if !updates.is_empty() {
self.logger.log({
TargetUpdate {
tracker_id: self.identifier,
updates
}
});
}
}
}
#[derive(Debug, Clone)]
pub enum TrackerEvent<T> {
SourceUpdate(SourceUpdate<T>),
TargetUpdate(TargetUpdate<T>),
}
#[derive(Debug, Clone)]
pub struct SourceUpdate<T> {
pub tracker_id: usize,
pub updates: Vec<(usize, usize, T, i64)>,
}
#[derive(Debug, Clone)]
pub struct TargetUpdate<T> {
pub tracker_id: usize,
pub updates: Vec<(usize, usize, T, i64)>,
}
impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
}
impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
}
}
impl<T: Timestamp> Drop for Tracker<T> {
fn drop(&mut self) {
let logger = if let Some(logger) = &mut self.logger {
logger
} else {
return;
};
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
let target_changes = per_operator.targets
.iter_mut()
.enumerate()
.flat_map(|(port, target)| {
target.pointstamps
.updates()
.map(move |(time, diff)| (index, port, time, -diff))
});
logger.log_target_updates(target_changes);
let source_changes = per_operator.sources
.iter_mut()
.enumerate()
.flat_map(|(port, source)| {
source.pointstamps
.updates()
.map(move |(time, diff)| (index, port, time, -diff))
});
logger.log_source_updates(source_changes);
}
}
}