use std::default::Default;
use progress::frontier::{MutableAntichain, Antichain};
use progress::{Timestamp, Operate};
use progress::nested::Target;
use progress::nested::subgraph::Target::{GraphOutput, ChildInput};
use progress::count_map::CountMap;
pub struct ChildWrapper<T: Timestamp> {
pub name: String,
pub addr: Vec<usize>,
pub scope: Option<Box<Operate<T>>>,
index: usize,
pub inputs: usize, pub outputs: usize,
pub edges: Vec<Vec<Target>>,
pub local: bool, pub notify: bool,
pub summary: Vec<Vec<Antichain<T::Summary>>>,
pub guarantees: Vec<MutableAntichain<T>>, pub capabilities: Vec<MutableAntichain<T>>, pub outstanding_messages: Vec<MutableAntichain<T>>,
internal_progress: Vec<CountMap<T>>, consumed_messages: Vec<CountMap<T>>, produced_messages: Vec<CountMap<T>>,
pub guarantee_changes: Vec<CountMap<T>>, }
impl<T: Timestamp> ChildWrapper<T> {
pub fn new(mut scope: Box<Operate<T>>, index: usize, mut path: Vec<usize>) -> ChildWrapper<T> {
path.push(index);
::logging::log(&::logging::OPERATES, ::logging::OperatesEvent { addr: path.clone(), name: scope.name().to_owned() });
let local = scope.local();
let inputs = scope.inputs();
let outputs = scope.outputs();
let notify = scope.notify_me();
let (summary, work) = scope.get_internal_summary();
assert!(summary.len() == inputs);
assert!(!summary.iter().any(|x| x.len() != outputs));
let mut result = ChildWrapper {
name: format!("{}[{}]", scope.name(), index),
addr: path,
scope: Some(scope),
index: index,
local: local,
inputs: inputs,
outputs: outputs,
edges: vec![Default::default(); outputs],
notify: notify,
summary: summary,
guarantees: vec![Default::default(); inputs],
capabilities: vec![Default::default(); outputs],
outstanding_messages: vec![Default::default(); inputs],
internal_progress: vec![CountMap::new(); outputs],
consumed_messages: vec![CountMap::new(); inputs],
produced_messages: vec![CountMap::new(); outputs],
guarantee_changes: vec![CountMap::new(); inputs],
};
for (index, capability) in result.capabilities.iter_mut().enumerate() {
capability.update_iter_and(work[index].elements().iter().map(|x|x.clone()), |_, _| {});
}
return result;
}
pub fn set_external_summary(&mut self, summaries: Vec<Vec<Antichain<T::Summary>>>, frontier: &mut [CountMap<T>]) {
self.scope.as_mut().map(|scope| scope.set_external_summary(summaries, frontier));
}
pub fn push_pointstamps(&mut self, external_progress: &[CountMap<T>]) {
assert!(self.scope.is_some() || external_progress.iter().all(|x| x.len() == 0));
if self.notify && external_progress.iter().any(|x| x.len() > 0) {
for input_port in (0..self.inputs) {
self.guarantees[input_port]
.update_into_cm(&external_progress[input_port], &mut self.guarantee_changes[input_port]);
}
if self.guarantee_changes.iter().any(|x| x.len() > 0) {
let changes = &mut self.guarantee_changes;
self.scope.as_mut().map(|scope| scope.push_external_progress(changes));
debug_assert!(!changes.iter().any(|x| x.len() > 0));
}
}
}
pub fn pull_pointstamps(&mut self, pointstamp_messages: &mut CountMap<(usize, usize, T)>,
pointstamp_internal: &mut CountMap<(usize, usize, T)>) -> bool {
let active = {
::logging::log(&::logging::SCHEDULE, ::logging::ScheduleEvent { addr: self.addr.clone(), is_start: true });
let result = if let &mut Some(ref mut scope) = &mut self.scope {
scope.pull_internal_progress(&mut self.internal_progress,
&mut self.consumed_messages,
&mut self.produced_messages)
}
else { false };
::logging::log(&::logging::SCHEDULE, ::logging::ScheduleEvent { addr: self.addr.clone(), is_start: false });
result
};
if self.scope.is_some() &&
!active &&
self.notify && self.guarantees.iter().all(|guarantee| guarantee.empty()) &&
self.capabilities.iter().all(|capability| capability.empty()) {
self.scope = None;
self.name = format!("{}(tombstone)", self.name);
}
for output in (0..self.outputs) {
while let Some((time, delta)) = self.produced_messages[output].pop() {
for &(target, t_port) in self.edges[output].iter() {
pointstamp_messages.update((target, t_port, time), delta);
}
}
while let Some((time, delta)) = self.internal_progress[output].pop() {
pointstamp_internal.update(&(self.index, output, time), delta);
}
}
for input in (0..self.inputs) {
while let Some((time, delta)) = self.consumed_messages[input].pop() {
pointstamp_messages.update(&(self.index, input, time), -delta);
}
}
return active;
}
pub fn add_edge(&mut self, output: usize, target: Target) { self.edges[output].push(target); }
pub fn name(&self) -> String { self.name.clone() }
}