1use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelyProgressLogger as ProgressLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30pub struct SubgraphBuilder<TOuter, TInner>
40where
41 TOuter: Timestamp,
42 TInner: Timestamp,
43{
44 pub name: String,
46
47 pub path: Vec<usize>,
49
50 index: usize,
52
53 children: Vec<PerOperatorState<TInner>>,
55 child_count: usize,
56
57 edge_stash: Vec<(Source, Target)>,
58
59 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
61
62 output_capabilities: Vec<MutableAntichain<TOuter>>,
64
65 logging: Option<Logger>,
67
68 progress_logging: Option<ProgressLogger>,
70}
71
72impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
73where
74 TOuter: Timestamp,
75 TInner: Timestamp+Refines<TOuter>,
76{
77 pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
79 self.input_messages.push(shared_counts);
80 Target::new(self.index, self.input_messages.len() - 1)
81 }
82
83 pub fn new_output(&mut self) -> Source {
85 self.output_capabilities.push(MutableAntichain::new());
86 Source::new(self.index, self.output_capabilities.len() - 1)
87 }
88
89 pub fn connect(&mut self, source: Source, target: Target) {
94 self.edge_stash.push((source, target));
95 }
96
97 pub fn new_from(
99 index: usize,
100 mut path: Vec<usize>,
101 logging: Option<Logger>,
102 progress_logging: Option<ProgressLogger>,
103 name: &str,
104 )
105 -> SubgraphBuilder<TOuter, TInner>
106 {
107 path.push(index);
108
109 let children = vec![PerOperatorState::empty(0, 0)];
111
112 SubgraphBuilder {
113 name: name.to_owned(),
114 path,
115 index,
116 children,
117 child_count: 1,
118 edge_stash: Vec::new(),
119 input_messages: Vec::new(),
120 output_capabilities: Vec::new(),
121 logging,
122 progress_logging,
123 }
124 }
125
126 pub fn allocate_child_id(&mut self) -> usize {
128 self.child_count += 1;
129 self.child_count - 1
130 }
131
132 pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
134 {
135 let mut child_path = self.path.clone();
136 child_path.push(index);
137 self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {
138 id: identifier,
139 addr: child_path,
140 name: child.name().to_owned(),
141 }));
142 }
143 self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
144 }
145
146 pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
148 self.children.sort_by(|x,y| x.index.cmp(&y.index));
155 assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
156
157 let inputs = self.input_messages.len();
158 let outputs = self.output_capabilities.len();
159
160 self.children[0] = PerOperatorState::empty(outputs, inputs);
162
163 let mut builder = reachability::Builder::new();
164
165 builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]);
167 for (index, child) in self.children.iter().enumerate().skip(1) {
168 builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
169 }
170
171 for (source, target) in self.edge_stash {
172 self.children[source.node].edges[source.port].push(target);
173 builder.add_edge(source, target);
174 }
175
176 let path = self.path.clone();
178 let reachability_logging =
179 worker.log_register()
180 .get::<reachability::logging::TrackerEvent>("timely/reachability")
181 .map(|logger| reachability::logging::TrackerLogger::new(path, logger));
182 let (tracker, scope_summary) = builder.build(reachability_logging);
183
184 let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
185
186 let mut incomplete = vec![true; self.children.len()];
187 incomplete[0] = false;
188 let incomplete_count = incomplete.len() - 1;
189
190 let activations = worker.activations();
191
192 activations.borrow_mut().activate(&self.path[..]);
193
194 Subgraph {
195 name: self.name,
196 path: self.path,
197 inputs,
198 outputs,
199 incomplete,
200 incomplete_count,
201 activations,
202 temp_active: BinaryHeap::new(),
203 maybe_shutdown: Vec::new(),
204 children: self.children,
205 input_messages: self.input_messages,
206 output_capabilities: self.output_capabilities,
207
208 local_pointstamp: ChangeBatch::new(),
209 final_pointstamp: ChangeBatch::new(),
210 progcaster,
211 pointstamp_tracker: tracker,
212
213 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
214 scope_summary,
215
216 progress_mode: worker.config().progress_mode,
217 }
218 }
219}
220
221
222pub struct Subgraph<TOuter, TInner>
227where
228 TOuter: Timestamp,
229 TInner: Timestamp+Refines<TOuter>,
230{
231 name: String, pub path: Vec<usize>,
234 inputs: usize, outputs: usize, children: Vec<PerOperatorState<TInner>>,
239
240 incomplete: Vec<bool>, incomplete_count: usize, activations: Rc<RefCell<Activations>>,
245 temp_active: BinaryHeap<Reverse<usize>>,
246 maybe_shutdown: Vec<usize>,
247
248 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
250
251 output_capabilities: Vec<MutableAntichain<TOuter>>,
253
254 local_pointstamp: ChangeBatch<(Location, TInner)>,
256 final_pointstamp: ChangeBatch<(Location, TInner)>,
257
258 pointstamp_tracker: reachability::Tracker<TInner>,
261
262 progcaster: Progcaster<TInner>,
264
265 shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
266 scope_summary: Vec<Vec<Antichain<TInner::Summary>>>,
267
268 progress_mode: ProgressMode,
269}
270
271impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
272where
273 TOuter: Timestamp,
274 TInner: Timestamp+Refines<TOuter>,
275{
276 fn name(&self) -> &str { &self.name }
277
278 fn path(&self) -> &[usize] { &self.path }
279
280 fn schedule(&mut self) -> bool {
281
282 self.accept_frontier(); self.harvest_inputs(); self.progcaster.recv(&mut self.final_pointstamp);
292
293 self.propagate_pointstamps();
295
296 { let temp_active = &mut self.temp_active;
298 self.activations
299 .borrow_mut()
300 .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
301 }
302
303 let mut previous = 0;
308 while let Some(Reverse(index)) = self.temp_active.pop() {
309 if index > previous {
311 self.activate_child(index);
313 previous = index;
314 }
315 }
316
317 self.send_progress();
319
320 if !self.final_pointstamp.is_empty() {
322 self.activations.borrow_mut().activate(&self.path[..]);
323 }
324
325 let incomplete = self.incomplete_count > 0;
327 let tracking = self.pointstamp_tracker.tracking_anything();
328
329 incomplete || tracking
330 }
331}
332
333
334impl<TOuter, TInner> Subgraph<TOuter, TInner>
335where
336 TOuter: Timestamp,
337 TInner: Timestamp+Refines<TOuter>,
338{
339 fn activate_child(&mut self, child_index: usize) -> bool {
343
344 let child = &mut self.children[child_index];
345
346 let incomplete = child.schedule();
347
348 if incomplete != self.incomplete[child_index] {
349 if incomplete { self.incomplete_count += 1; }
350 else { self.incomplete_count -= 1; }
351 self.incomplete[child_index] = incomplete;
352 }
353
354 if !incomplete {
355 let child_state = self.pointstamp_tracker.node_state(child_index);
357 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
358 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
359 if frontiers_empty && no_capabilities {
360 child.shut_down();
361 }
362 }
363 else {
364 #[cfg(debug_assertions)] {
366 child.validate_progress(self.pointstamp_tracker.node_state(child_index));
367 }
368 }
369
370 if child.local {
372 child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
373 }
374 else {
375 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
376 }
377
378 incomplete
379 }
380
381 fn accept_frontier(&mut self) {
383 for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
384 let source = Source::new(0, port);
385 for (time, value) in changes.drain() {
386 self.pointstamp_tracker.update_source(
387 source,
388 TInner::to_inner(time),
389 value
390 );
391 }
392 }
393 }
394
395 fn harvest_inputs(&mut self) {
402 for input in 0 .. self.inputs {
403 let source = Location::new_source(0, input);
404 let mut borrowed = self.input_messages[input].borrow_mut();
405 for (time, delta) in borrowed.drain() {
406 for target in &self.children[0].edges[input] {
407 self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
408 }
409 self.local_pointstamp.update((source, time), -delta);
410 }
411 }
412 }
413
414 fn propagate_pointstamps(&mut self) {
430
431 for ((location, timestamp), delta) in self.final_pointstamp.drain() {
433
434 if location.node == 0 {
436 match location.port {
437 Port::Source(scope_input) => {
440 self.shared_progress
441 .borrow_mut()
442 .consumeds[scope_input]
443 .update(timestamp.to_outer(), -delta);
444 },
445 Port::Target(scope_output) => {
449 self.shared_progress
450 .borrow_mut()
451 .produceds[scope_output]
452 .update(timestamp.to_outer(), delta);
453 },
454 }
455 }
456 else {
457 self.pointstamp_tracker.update(location, timestamp, delta);
458 }
459 }
460
461 self.pointstamp_tracker.propagate_all();
463
464 for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
466 self.maybe_shutdown.push(location.node);
467 if let crate::progress::Port::Target(port) = location.port {
469 if self.children[location.node].notify {
470 self.temp_active.push(Reverse(location.node));
471 }
472 self.children[location.node]
476 .shared_progress
477 .borrow_mut()
478 .frontiers[port]
479 .update(time, diff);
480 }
481 }
482
483 self.maybe_shutdown.sort();
485 self.maybe_shutdown.dedup();
486 for child_index in self.maybe_shutdown.drain(..) {
487 let child_state = self.pointstamp_tracker.node_state(child_index);
488 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
489 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
490 if frontiers_empty && no_capabilities {
491 self.temp_active.push(Reverse(child_index));
492 }
493 }
494
495 for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
497 self.pointstamp_tracker
498 .pushed_output()[output]
499 .drain()
500 .map(|(time, diff)| (time.to_outer(), diff))
501 .filter_through(&mut self.output_capabilities[output])
502 .for_each(|(time, diff)| internal.update(time, diff));
503 }
504 }
505
506 fn send_progress(&mut self) {
511
512 let must_send = self.progress_mode == ProgressMode::Eager || {
515 let tracker = &mut self.pointstamp_tracker;
516 self.local_pointstamp
517 .iter()
518 .any(|((location, time), diff)|
519 tracker.is_global(*location, time) && *diff < 0
521 )
522 };
523
524 if must_send {
525 self.progcaster.send(&mut self.local_pointstamp);
526 }
527 }
528}
529
530
531impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
532where
533 TOuter: Timestamp,
534 TInner: Timestamp+Refines<TOuter>,
535{
536 fn local(&self) -> bool { false }
537 fn inputs(&self) -> usize { self.inputs }
538 fn outputs(&self) -> usize { self.outputs }
539
540 fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
543
544 assert_eq!(self.children[0].outputs, self.inputs());
546 assert_eq!(self.children[0].inputs, self.outputs());
547
548 let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
552 for (input_idx, input) in self.scope_summary.iter().enumerate() {
553 for (output_idx, output) in input.iter().enumerate() {
554 let antichain = &mut internal_summary[input_idx][output_idx];
555 antichain.reserve(output.elements().len());
556 antichain.extend(output.elements().iter().cloned().map(TInner::summarize));
557 }
558 }
559
560 debug_assert_eq!(
561 internal_summary.len(),
562 self.inputs(),
563 "the internal summary should have as many elements as there are inputs",
564 );
565 debug_assert!(
566 internal_summary.iter().all(|summary| summary.len() == self.outputs()),
567 "each element of the internal summary should have as many elements as there are outputs",
568 );
569
570 for child in self.children.iter_mut() {
574 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
575 }
576
577 self.propagate_pointstamps(); (internal_summary, self.shared_progress.clone())
581 }
582
583 fn set_external_summary(&mut self) {
584 self.accept_frontier();
585 self.propagate_pointstamps(); self.children
587 .iter_mut()
588 .flat_map(|child| child.operator.as_mut())
589 .for_each(|op| op.set_external_summary());
590 }
591}
592
593struct PerOperatorState<T: Timestamp> {
594
595 name: String, index: usize, id: usize, local: bool, notify: bool,
601 inputs: usize, outputs: usize, operator: Option<Box<dyn Operate<T>>>,
605
606 edges: Vec<Vec<Target>>, shared_progress: Rc<RefCell<SharedProgress<T>>>,
609
610 internal_summary: Vec<Vec<Antichain<T::Summary>>>, logging: Option<Logger>,
613}
614
615impl<T: Timestamp> PerOperatorState<T> {
616
617 fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
618 PerOperatorState {
619 name: "External".to_owned(),
620 operator: None,
621 index: 0,
622 id: usize::max_value(),
623 local: false,
624 notify: true,
625 inputs,
626 outputs,
627
628 edges: vec![Vec::new(); outputs],
629
630 logging: None,
631
632 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
633 internal_summary: Vec::new(),
634 }
635 }
636
637 pub fn new(
638 mut scope: Box<dyn Operate<T>>,
639 index: usize,
640 mut _path: Vec<usize>,
641 identifier: usize,
642 logging: Option<Logger>
643 ) -> PerOperatorState<T>
644 {
645 let local = scope.local();
646 let inputs = scope.inputs();
647 let outputs = scope.outputs();
648 let notify = scope.notify_me();
649
650 let (internal_summary, shared_progress) = scope.get_internal_summary();
651
652 assert_eq!(
653 internal_summary.len(),
654 inputs,
655 "operator summary has {} inputs when {} were expected",
656 internal_summary.len(),
657 inputs,
658 );
659 assert!(
660 !internal_summary.iter().any(|x| x.len() != outputs),
661 "operator summary had too few outputs",
662 );
663
664 PerOperatorState {
665 name: scope.name().to_owned(),
666 operator: Some(scope),
667 index,
668 id: identifier,
669 local,
670 notify,
671 inputs,
672 outputs,
673 edges: vec![vec![]; outputs],
674
675 logging,
676
677 shared_progress,
678 internal_summary,
679 }
680 }
681
682 pub fn schedule(&mut self) -> bool {
683
684 if let Some(ref mut operator) = self.operator {
685
686 if let Some(l) = self.logging.as_mut() {
688 let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
692 if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
693 l.log(crate::logging::PushProgressEvent { op_id: self.id })
694 }
695
696 l.log(crate::logging::ScheduleEvent::start(self.id));
697 }
698
699 let incomplete = operator.schedule();
700
701 if let Some(l) = self.logging.as_mut() {
703 l.log(crate::logging::ScheduleEvent::stop(self.id));
704 }
705
706 incomplete
707 }
708 else {
709
710 if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
712 println!("Operator prematurely shut down: {}", self.name);
713 println!(" {:?}", self.notify);
714 println!(" {:?}", self.shared_progress.borrow_mut().frontiers);
715 panic!();
716 }
717
718 false
720 }
721 }
722
723 fn shut_down(&mut self) {
724 if self.operator.is_some() {
725 if let Some(l) = self.logging.as_mut() {
726 l.log(crate::logging::ShutdownEvent{ id: self.id });
727 }
728 self.operator = None;
729 }
730 }
731
732 fn extract_progress(&mut self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
734
735 let shared_progress = &mut *self.shared_progress.borrow_mut();
736
737 for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
739 let target = Location::new_target(self.index, input);
740 for (time, delta) in consumed.drain() {
741 pointstamps.update((target, time), -delta);
742 }
743 }
744 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
745 let source = Location::new_source(self.index, output);
746 for (time, delta) in internal.drain() {
747 pointstamps.update((source, time.clone()), delta);
748 }
749 }
750 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
751 for (time, delta) in produced.drain() {
752 for target in &self.edges[output] {
753 pointstamps.update((Location::from(*target), time.clone()), delta);
754 temp_active.push(Reverse(target.node));
755 }
756 }
757 }
758 }
759
760 #[allow(dead_code)]
765 fn validate_progress(&mut self, child_state: &reachability::PerOperator<T>) {
766
767 let shared_progress = &mut *self.shared_progress.borrow_mut();
768
769 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
771 for (time, diff) in internal.iter() {
772 if *diff > 0 {
773 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
774 let internal = child_state.sources[output].implications.less_equal(time);
775 if !consumed && !internal {
776 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
777 panic!("Progress error; internal {:?}", self.name);
778 }
779 }
780 }
781 }
782 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
783 for (time, diff) in produced.iter() {
784 if *diff > 0 {
785 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
786 let internal = child_state.sources[output].implications.less_equal(time);
787 if !consumed && !internal {
788 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
789 panic!("Progress error; produced {:?}", self.name);
790 }
791 }
792 }
793 }
794 }
795}
796
797impl<T: Timestamp> Drop for PerOperatorState<T> {
799 fn drop(&mut self) {
800 self.shut_down();
801 }
802}