use std::rc::Rc;
use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::scheduling::Schedule;
use crate::scheduling::activate::Activations;
use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
use crate::progress::{Timestamp, Operate, operate::SharedProgress};
use crate::progress::{Location, Port, Source, Target};
use crate::progress::ChangeBatch;
use crate::progress::broadcast::Progcaster;
use crate::progress::reachability;
use crate::progress::timestamp::Refines;
use crate::worker::ProgressMode;
pub struct SubgraphBuilder<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp,
{
pub name: String,
pub path: Vec<usize>,
index: usize,
children: Vec<PerOperatorState<TInner>>,
child_count: usize,
edge_stash: Vec<(Source, Target)>,
input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
output_capabilities: Vec<MutableAntichain<TOuter>>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
}
impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp+Refines<TOuter>,
{
pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
self.input_messages.push(shared_counts);
Target::new(self.index, self.input_messages.len() - 1)
}
pub fn new_output(&mut self) -> Source {
self.output_capabilities.push(MutableAntichain::new());
Source::new(self.index, self.output_capabilities.len() - 1)
}
pub fn connect(&mut self, source: Source, target: Target) {
self.edge_stash.push((source, target));
}
pub fn new_from(
index: usize,
mut path: Vec<usize>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
{
path.push(index);
let children = vec![PerOperatorState::empty(0, 0)];
SubgraphBuilder {
name: name.to_owned(),
path,
index,
children,
child_count: 1,
edge_stash: Vec::new(),
input_messages: Vec::new(),
output_capabilities: Vec::new(),
logging,
progress_logging,
}
}
pub fn allocate_child_id(&mut self) -> usize {
self.child_count += 1;
self.child_count - 1
}
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
{
let mut child_path = self.path.clone();
child_path.push(index);
self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
}));
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
}
pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
self.children.sort_by(|x,y| x.index.cmp(&y.index));
assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
let inputs = self.input_messages.len();
let outputs = self.output_capabilities.len();
self.children[0] = PerOperatorState::empty(outputs, inputs);
let mut builder = reachability::Builder::new();
builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]);
for (index, child) in self.children.iter().enumerate().skip(1) {
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
}
for (source, target) in self.edge_stash {
self.children[source.node].edges[source.port].push(target);
builder.add_edge(source, target);
}
let (tracker, scope_summary) = builder.build();
let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
let mut incomplete = vec![true; self.children.len()];
incomplete[0] = false;
let incomplete_count = incomplete.len() - 1;
let activations = worker.activations().clone();
activations.borrow_mut().activate(&self.path[..]);
Subgraph {
name: self.name,
path: self.path,
inputs,
outputs,
incomplete,
incomplete_count,
activations,
temp_active: BinaryHeap::new(),
children: self.children,
input_messages: self.input_messages,
output_capabilities: self.output_capabilities,
local_pointstamp: ChangeBatch::new(),
final_pointstamp: ChangeBatch::new(),
progcaster,
pointstamp_tracker: tracker,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
scope_summary,
progress_mode: worker.config().progress_mode,
}
}
}
pub struct Subgraph<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp+Refines<TOuter>,
{
name: String,
pub path: Vec<usize>,
inputs: usize,
outputs: usize,
children: Vec<PerOperatorState<TInner>>,
incomplete: Vec<bool>,
incomplete_count: usize,
activations: Rc<RefCell<Activations>>,
temp_active: BinaryHeap<Reverse<usize>>,
input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
output_capabilities: Vec<MutableAntichain<TOuter>>,
local_pointstamp: ChangeBatch<(Location, TInner)>,
final_pointstamp: ChangeBatch<(Location, TInner)>,
pointstamp_tracker: reachability::Tracker<TInner>,
progcaster: Progcaster<TInner>,
shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
scope_summary: Vec<Vec<Antichain<TInner::Summary>>>,
progress_mode: ProgressMode,
}
impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp+Refines<TOuter>,
{
fn name(&self) -> &str { &self.name }
fn path(&self) -> &[usize] { &self.path }
fn schedule(&mut self) -> bool {
self.accept_frontier();
self.harvest_inputs();
self.progcaster.recv(&mut self.final_pointstamp);
self.propagate_pointstamps();
{
let temp_active = &mut self.temp_active;
self.activations
.borrow_mut()
.for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
}
let mut previous = 0;
while let Some(Reverse(index)) = self.temp_active.pop() {
if index > previous {
self.activate_child(index);
previous = index;
}
}
self.send_progress();
if !self.final_pointstamp.is_empty() {
self.activations.borrow_mut().activate(&self.path[..]);
}
let incomplete = self.incomplete_count > 0;
let tracking = self.pointstamp_tracker.tracking_anything();
incomplete || tracking
}
}
impl<TOuter, TInner> Subgraph<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp+Refines<TOuter>,
{
fn activate_child(&mut self, child_index: usize) -> bool {
let child = &mut self.children[child_index];
let incomplete = child.schedule();
if incomplete != self.incomplete[child_index] {
if incomplete { self.incomplete_count += 1; }
else { self.incomplete_count -= 1; }
self.incomplete[child_index] = incomplete;
}
if !incomplete {
let child_state = self.pointstamp_tracker.node_state(child_index);
let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
if frontiers_empty && no_capabilities {
child.shut_down();
}
}
else {
#[cfg(debug_assertions)] {
child.validate_progress(self.pointstamp_tracker.node_state(child_index));
}
}
if child.local {
child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
}
else {
child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
}
incomplete
}
fn accept_frontier(&mut self) {
for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
let source = Source::new(0, port);
for (time, value) in changes.drain() {
self.pointstamp_tracker.update_source(
source,
TInner::to_inner(time),
value
);
}
}
}
fn harvest_inputs(&mut self) {
for input in 0 .. self.inputs {
let source = Location::new_source(0, input);
let mut borrowed = self.input_messages[input].borrow_mut();
for (time, delta) in borrowed.drain() {
for target in &self.children[0].edges[input] {
self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
}
self.local_pointstamp.update((source, time), -delta);
}
}
}
fn propagate_pointstamps(&mut self) {
for ((location, timestamp), delta) in self.final_pointstamp.drain() {
if location.node == 0 {
match location.port {
Port::Source(scope_input) => {
self.shared_progress
.borrow_mut()
.consumeds[scope_input]
.update(timestamp.to_outer(), -delta);
},
Port::Target(scope_output) => {
self.shared_progress
.borrow_mut()
.produceds[scope_output]
.update(timestamp.to_outer(), delta);
},
}
}
else {
self.pointstamp_tracker.update(location, timestamp, delta);
}
}
self.pointstamp_tracker.propagate_all();
for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
if let crate::progress::Port::Target(port) = location.port {
if self.children[location.node].notify {
self.temp_active.push(Reverse(location.node));
}
self.children[location.node]
.shared_progress
.borrow_mut()
.frontiers[port]
.update(time, diff);
}
}
for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
self.pointstamp_tracker
.pushed_output()[output]
.drain()
.map(|(time, diff)| (time.to_outer(), diff))
.filter_through(&mut self.output_capabilities[output])
.for_each(|(time, diff)| internal.update(time, diff));
}
}
fn send_progress(&mut self) {
let must_send = self.progress_mode == ProgressMode::Eager || {
let tracker = &mut self.pointstamp_tracker;
self.local_pointstamp
.iter()
.any(|((location, time), diff)|
tracker.is_global(*location, time) && *diff < 0
)
};
if must_send {
self.progcaster.send(&mut self.local_pointstamp);
}
}
}
impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
where
TOuter: Timestamp,
TInner: Timestamp+Refines<TOuter>,
{
fn local(&self) -> bool { false }
fn inputs(&self) -> usize { self.inputs }
fn outputs(&self) -> usize { self.outputs }
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
assert_eq!(self.children[0].outputs, self.inputs());
assert_eq!(self.children[0].inputs, self.outputs());
let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
for input in 0 .. self.scope_summary.len() {
for output in 0 .. self.scope_summary[input].len() {
for path_summary in self.scope_summary[input][output].elements().iter() {
internal_summary[input][output].insert(TInner::summarize(path_summary.clone()));
}
}
}
for child in self.children.iter_mut() {
child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
}
self.propagate_pointstamps();
(internal_summary, self.shared_progress.clone())
}
fn set_external_summary(&mut self) {
self.accept_frontier();
self.propagate_pointstamps();
self.children
.iter_mut()
.flat_map(|child| child.operator.as_mut())
.for_each(|op| op.set_external_summary());
}
}
struct PerOperatorState<T: Timestamp> {
name: String,
index: usize,
id: usize,
local: bool,
notify: bool,
inputs: usize,
outputs: usize,
operator: Option<Box<dyn Operate<T>>>,
edges: Vec<Vec<Target>>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
internal_summary: Vec<Vec<Antichain<T::Summary>>>,
logging: Option<Logger>,
}
impl<T: Timestamp> PerOperatorState<T> {
fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
PerOperatorState {
name: "External".to_owned(),
operator: None,
index: 0,
id: usize::max_value(),
local: false,
notify: true,
inputs,
outputs,
edges: vec![Vec::new(); outputs],
logging: None,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
internal_summary: Vec::new(),
}
}
pub fn new(
mut scope: Box<dyn Operate<T>>,
index: usize,
mut _path: Vec<usize>,
identifier: usize,
logging: Option<Logger>
) -> PerOperatorState<T>
{
let local = scope.local();
let inputs = scope.inputs();
let outputs = scope.outputs();
let notify = scope.notify_me();
let (internal_summary, shared_progress) = scope.get_internal_summary();
assert_eq!(internal_summary.len(), inputs);
assert!(!internal_summary.iter().any(|x| x.len() != outputs));
PerOperatorState {
name: scope.name().to_owned(),
operator: Some(scope),
index,
id: identifier,
local,
notify,
inputs,
outputs,
edges: vec![vec![]; outputs],
logging,
shared_progress,
internal_summary,
}
}
pub fn schedule(&mut self) -> bool {
if let Some(ref mut operator) = self.operator {
if let Some(l) = self.logging.as_mut() {
let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
l.log(crate::logging::PushProgressEvent { op_id: self.id })
}
l.log(crate::logging::ScheduleEvent::start(self.id));
}
let incomplete = operator.schedule();
if let Some(l) = self.logging.as_mut() {
l.log(crate::logging::ScheduleEvent::stop(self.id));
}
incomplete
}
else {
if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
println!("Operator prematurely shut down: {}", self.name);
println!(" {:?}", self.notify);
println!(" {:?}", self.shared_progress.borrow_mut().frontiers);
panic!();
}
false
}
}
fn shut_down(&mut self) {
if self.operator.is_some() {
if let Some(l) = self.logging.as_mut() {
l.log(crate::logging::ShutdownEvent{ id: self.id });
}
self.operator = None;
self.name = format!("{}(tombstone)", self.name);
}
}
fn extract_progress(&mut self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
let shared_progress = &mut *self.shared_progress.borrow_mut();
for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
let target = Location::new_target(self.index, input);
for (time, delta) in consumed.drain() {
pointstamps.update((target, time), -delta);
}
}
for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
let source = Location::new_source(self.index, output);
for (time, delta) in internal.drain() {
pointstamps.update((source, time.clone()), delta);
}
}
for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
for (time, delta) in produced.drain() {
for target in &self.edges[output] {
pointstamps.update((Location::from(*target), time.clone()), delta);
temp_active.push(Reverse(target.node));
}
}
}
}
#[allow(dead_code)]
fn validate_progress(&mut self, child_state: &reachability::PerOperator<T>) {
let shared_progress = &mut *self.shared_progress.borrow_mut();
for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
for (time, diff) in internal.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; internal {:?}", self.name);
}
}
}
}
for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
for (time, diff) in produced.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; produced {:?}", self.name);
}
}
}
}
}
}
impl<T: Timestamp> Drop for PerOperatorState<T> {
fn drop(&mut self) {
self.shut_down();
}
}