timely/progress/
subgraph.rs

1//! A dataflow subgraph
2//!
3//! Timely dataflow graphs can be nested hierarchically, where some region of
4//! graph is grouped, and presents upwards as an operator. This grouping needs
5//! some care, to make sure that the presented operator reflects the behavior
6//! of the grouped operators.
7
8use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelyProgressLogger as ProgressLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30// IMPORTANT : by convention, a child identifier of zero is used to indicate inputs and outputs of
31// the Subgraph itself. An identifier greater than zero corresponds to an actual child, which can
32// be found at position (id - 1) in the `children` field of the Subgraph.
33
34/// A builder for interactively initializing a `Subgraph`.
35///
36/// This collects all the information necessary to get a `Subgraph` up and
37/// running, and is important largely through its `build` method which
38/// actually creates a `Subgraph`.
39pub struct SubgraphBuilder<TOuter, TInner>
40where
41    TOuter: Timestamp,
42    TInner: Timestamp,
43{
44    /// The name of this subgraph.
45    pub name: String,
46
47    /// A sequence of integers uniquely identifying the subgraph.
48    pub path: Vec<usize>,
49
50    /// The index assigned to the subgraph by its parent.
51    index: usize,
52
53    // handles to the children of the scope. index i corresponds to entry i-1, unless things change.
54    children: Vec<PerOperatorState<TInner>>,
55    child_count: usize,
56
57    edge_stash: Vec<(Source, Target)>,
58
59    // shared state written to by the datapath, counting records entering this subgraph instance.
60    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
61
62    // expressed capabilities, used to filter changes against.
63    output_capabilities: Vec<MutableAntichain<TOuter>>,
64
65    /// Logging handle
66    logging: Option<Logger>,
67
68    /// Progress logging handle
69    progress_logging: Option<ProgressLogger>,
70}
71
72impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
73where
74    TOuter: Timestamp,
75    TInner: Timestamp+Refines<TOuter>,
76{
77    /// Allocates a new input to the subgraph and returns the target to that input in the outer graph.
78    pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
79        self.input_messages.push(shared_counts);
80        Target::new(self.index, self.input_messages.len() - 1)
81    }
82
83    /// Allocates a new output from the subgraph and returns the source of that output in the outer graph.
84    pub fn new_output(&mut self) -> Source {
85        self.output_capabilities.push(MutableAntichain::new());
86        Source::new(self.index, self.output_capabilities.len() - 1)
87    }
88
89    /// Introduces a dependence from the source to the target.
90    ///
91    /// This method does not effect data movement, but rather reveals to the progress tracking infrastructure
92    /// that messages produced by `source` should be expected to be consumed at `target`.
93    pub fn connect(&mut self, source: Source, target: Target) {
94        self.edge_stash.push((source, target));
95    }
96
97    /// Creates a new Subgraph from a channel allocator and "descriptive" indices.
98    pub fn new_from(
99        index: usize,
100        mut path: Vec<usize>,
101        logging: Option<Logger>,
102        progress_logging: Option<ProgressLogger>,
103        name: &str,
104    )
105        -> SubgraphBuilder<TOuter, TInner>
106    {
107        path.push(index);
108
109        // Put an empty placeholder for "outer scope" representative.
110        let children = vec![PerOperatorState::empty(0, 0)];
111
112        SubgraphBuilder {
113            name: name.to_owned(),
114            path,
115            index,
116            children,
117            child_count: 1,
118            edge_stash: Vec::new(),
119            input_messages: Vec::new(),
120            output_capabilities: Vec::new(),
121            logging,
122            progress_logging,
123        }
124    }
125
126    /// Allocates a new child identifier, for later use.
127    pub fn allocate_child_id(&mut self) -> usize {
128        self.child_count += 1;
129        self.child_count - 1
130    }
131
132    /// Adds a new child to the subgraph.
133    pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
134        {
135            let mut child_path = self.path.clone();
136            child_path.push(index);
137            self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {
138                id: identifier,
139                addr: child_path,
140                name: child.name().to_owned(),
141            }));
142        }
143        self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
144    }
145
146    /// Now that initialization is complete, actually build a subgraph.
147    pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
148        // at this point, the subgraph is frozen. we should initialize any internal state which
149        // may have been determined after construction (e.g. the numbers of inputs and outputs).
150        // we also need to determine what to return as a summary and initial capabilities, which
151        // will depend on child summaries and capabilities, as well as edges in the subgraph.
152
153        // perhaps first check that the children are sanely identified
154        self.children.sort_by(|x,y| x.index.cmp(&y.index));
155        assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
156
157        let inputs = self.input_messages.len();
158        let outputs = self.output_capabilities.len();
159
160        // Create empty child zero represenative.
161        self.children[0] = PerOperatorState::empty(outputs, inputs);
162
163        let mut builder = reachability::Builder::new();
164
165        // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
166        builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]);
167        for (index, child) in self.children.iter().enumerate().skip(1) {
168            builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
169        }
170
171        for (source, target) in self.edge_stash {
172            self.children[source.node].edges[source.port].push(target);
173            builder.add_edge(source, target);
174        }
175
176        // The `None` argument is optional logging infrastructure.
177        let path = self.path.clone();
178        let reachability_logging =
179        worker.log_register()
180            .get::<reachability::logging::TrackerEvent>("timely/reachability")
181            .map(|logger| reachability::logging::TrackerLogger::new(path, logger));
182        let (tracker, scope_summary) = builder.build(reachability_logging);
183
184        let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
185
186        let mut incomplete = vec![true; self.children.len()];
187        incomplete[0] = false;
188        let incomplete_count = incomplete.len() - 1;
189
190        let activations = worker.activations();
191
192        activations.borrow_mut().activate(&self.path[..]);
193
194        Subgraph {
195            name: self.name,
196            path: self.path,
197            inputs,
198            outputs,
199            incomplete,
200            incomplete_count,
201            activations,
202            temp_active: BinaryHeap::new(),
203            maybe_shutdown: Vec::new(),
204            children: self.children,
205            input_messages: self.input_messages,
206            output_capabilities: self.output_capabilities,
207
208            local_pointstamp: ChangeBatch::new(),
209            final_pointstamp: ChangeBatch::new(),
210            progcaster,
211            pointstamp_tracker: tracker,
212
213            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
214            scope_summary,
215
216            progress_mode: worker.config().progress_mode,
217        }
218    }
219}
220
221
222/// A dataflow subgraph.
223///
224/// The subgraph type contains the infrastructure required to describe the topology of and track
225/// progress within a dataflow subgraph.
226pub struct Subgraph<TOuter, TInner>
227where
228    TOuter: Timestamp,
229    TInner: Timestamp+Refines<TOuter>,
230{
231    name: String,           // an informative name.
232    /// Path of identifiers from the root.
233    pub path: Vec<usize>,
234    inputs: usize,          // number of inputs.
235    outputs: usize,         // number of outputs.
236
237    // handles to the children of the scope. index i corresponds to entry i-1, unless things change.
238    children: Vec<PerOperatorState<TInner>>,
239
240    incomplete: Vec<bool>,   // the incompletion status of each child.
241    incomplete_count: usize, // the number of incomplete children.
242
243    // shared activations (including children).
244    activations: Rc<RefCell<Activations>>,
245    temp_active: BinaryHeap<Reverse<usize>>,
246    maybe_shutdown: Vec<usize>,
247
248    // shared state written to by the datapath, counting records entering this subgraph instance.
249    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
250
251    // expressed capabilities, used to filter changes against.
252    output_capabilities: Vec<MutableAntichain<TOuter>>,
253
254    // pointstamp messages to exchange. ultimately destined for `messages` or `internal`.
255    local_pointstamp: ChangeBatch<(Location, TInner)>,
256    final_pointstamp: ChangeBatch<(Location, TInner)>,
257
258    // Graph structure and pointstamp tracker.
259    // pointstamp_builder: reachability::Builder<TInner>,
260    pointstamp_tracker: reachability::Tracker<TInner>,
261
262    // channel / whatever used to communicate pointstamp updates to peers.
263    progcaster: Progcaster<TInner>,
264
265    shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
266    scope_summary: Vec<Vec<Antichain<TInner::Summary>>>,
267
268    progress_mode: ProgressMode,
269}
270
271impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
272where
273    TOuter: Timestamp,
274    TInner: Timestamp+Refines<TOuter>,
275{
276    fn name(&self) -> &str { &self.name }
277
278    fn path(&self) -> &[usize] { &self.path }
279
280    fn schedule(&mut self) -> bool {
281
282        // This method performs several actions related to progress tracking
283        // and child operator scheduling. The actions have been broken apart
284        // into atomic actions that should be able to be safely executed in
285        // isolation, by a potentially clueless user (yours truly).
286
287        self.accept_frontier();         // Accept supplied frontier changes.
288        self.harvest_inputs();          // Count records entering the scope.
289
290        // Receive post-exchange progress updates.
291        self.progcaster.recv(&mut self.final_pointstamp);
292
293        // Commit and propagate final pointstamps.
294        self.propagate_pointstamps();
295
296        {   // Enqueue active children; scoped to let borrow drop.
297            let temp_active = &mut self.temp_active;
298            self.activations
299                .borrow_mut()
300                .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
301        }
302
303        // Schedule child operators.
304        //
305        // We should be able to schedule arbitrary subsets of children, as
306        // long as we eventually schedule all children that need to do work.
307        let mut previous = 0;
308        while let Some(Reverse(index)) = self.temp_active.pop() {
309            // De-duplicate, and don't revisit.
310            if index > previous {
311                // TODO: This is a moment where a scheduling decision happens.
312                self.activate_child(index);
313                previous = index;
314            }
315        }
316
317        // Transmit produced progress updates.
318        self.send_progress();
319
320        // If child scopes surface more final pointstamp updates we must re-execute.
321        if !self.final_pointstamp.is_empty() {
322            self.activations.borrow_mut().activate(&self.path[..]);
323        }
324
325        // A subgraph is incomplete if any child is incomplete, or there are outstanding messages.
326        let incomplete = self.incomplete_count > 0;
327        let tracking = self.pointstamp_tracker.tracking_anything();
328
329        incomplete || tracking
330    }
331}
332
333
334impl<TOuter, TInner> Subgraph<TOuter, TInner>
335where
336    TOuter: Timestamp,
337    TInner: Timestamp+Refines<TOuter>,
338{
339    /// Schedules a child operator and collects progress statements.
340    ///
341    /// The return value indicates that the child task cannot yet shut down.
342    fn activate_child(&mut self, child_index: usize) -> bool {
343
344        let child = &mut self.children[child_index];
345
346        let incomplete = child.schedule();
347
348        if incomplete != self.incomplete[child_index] {
349            if incomplete { self.incomplete_count += 1; }
350            else          { self.incomplete_count -= 1; }
351            self.incomplete[child_index] = incomplete;
352        }
353
354        if !incomplete {
355            // Consider shutting down the child, if neither capabilities nor input frontier.
356            let child_state = self.pointstamp_tracker.node_state(child_index);
357            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
358            let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
359            if frontiers_empty && no_capabilities {
360                child.shut_down();
361            }
362        }
363        else {
364            // In debug mode, check that the progress statements do not violate invariants.
365            #[cfg(debug_assertions)] {
366                child.validate_progress(self.pointstamp_tracker.node_state(child_index));
367            }
368        }
369
370        // Extract progress statements into either pre- or post-exchange buffers.
371        if child.local {
372            child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
373        }
374        else {
375            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
376        }
377
378        incomplete
379    }
380
381    /// Move frontier changes from parent into progress statements.
382    fn accept_frontier(&mut self) {
383        for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
384            let source = Source::new(0, port);
385            for (time, value) in changes.drain() {
386                self.pointstamp_tracker.update_source(
387                    source,
388                    TInner::to_inner(time),
389                    value
390                );
391            }
392        }
393    }
394
395    /// Collects counts of records entering the scope.
396    ///
397    /// This method moves message counts from the output of child zero to the inputs to
398    /// attached operators. This is a bit of a hack, because normally one finds capabilities
399    /// at an operator output, rather than message counts. These counts are used only at
400    /// mark [XXX] where they are reported upwards to the parent scope.
401    fn harvest_inputs(&mut self) {
402        for input in 0 .. self.inputs {
403            let source = Location::new_source(0, input);
404            let mut borrowed = self.input_messages[input].borrow_mut();
405            for (time, delta) in borrowed.drain() {
406                for target in &self.children[0].edges[input] {
407                    self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
408                }
409                self.local_pointstamp.update((source, time), -delta);
410            }
411        }
412    }
413
414    /// Commits pointstamps in `self.final_pointstamp`.
415    ///
416    /// This method performs several steps that for reasons of correctness must
417    /// be performed atomically, before control is returned. These are:
418    ///
419    /// 1. Changes to child zero's outputs are reported as consumed messages.
420    /// 2. Changes to child zero's inputs are reported as produced messages.
421    /// 3. Frontiers for child zero's inputs are reported as internal capabilities.
422    ///
423    /// Perhaps importantly, the frontiers for child zero are determined *without*
424    /// the messages that are produced for child zero inputs, as we only want to
425    /// report retained internal capabilities, and not now-external messages.
426    ///
427    /// In the course of propagating progress changes, we also propagate progress
428    /// changes for all of the managed child operators.
429    fn propagate_pointstamps(&mut self) {
430
431        // Process exchanged pointstamps. Handle child 0 statements carefully.
432        for ((location, timestamp), delta) in self.final_pointstamp.drain() {
433
434            // Child 0 corresponds to the parent scope and has special handling.
435            if location.node == 0 {
436                match location.port {
437                    // [XXX] Report child 0's capabilities as consumed messages.
438                    //       Note the re-negation of delta, to make counts positive.
439                    Port::Source(scope_input) => {
440                        self.shared_progress
441                            .borrow_mut()
442                            .consumeds[scope_input]
443                            .update(timestamp.to_outer(), -delta);
444                    },
445                    // [YYY] Report child 0's input messages as produced messages.
446                    //       Do not otherwise record, as we will not see subtractions,
447                    //       and we do not want to present their implications upward.
448                    Port::Target(scope_output) => {
449                        self.shared_progress
450                            .borrow_mut()
451                            .produceds[scope_output]
452                            .update(timestamp.to_outer(), delta);
453                    },
454                }
455            }
456            else {
457                self.pointstamp_tracker.update(location, timestamp, delta);
458            }
459        }
460
461        // Propagate implications of progress changes.
462        self.pointstamp_tracker.propagate_all();
463
464        // Drain propagated information into shared progress structure.
465        for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
466            self.maybe_shutdown.push(location.node);
467            // Targets are actionable, sources are not.
468            if let crate::progress::Port::Target(port) = location.port {
469                if self.children[location.node].notify {
470                    self.temp_active.push(Reverse(location.node));
471                }
472                // TODO: This logic could also be guarded by `.notify`, but
473                // we want to be a bit careful to make sure all related logic
474                // agrees with this (e.g. initialization, operator logic, etc.)
475                self.children[location.node]
476                    .shared_progress
477                    .borrow_mut()
478                    .frontiers[port]
479                    .update(time, diff);
480            }
481        }
482
483        // Consider scheduling each recipient of progress information to shut down.
484        self.maybe_shutdown.sort();
485        self.maybe_shutdown.dedup();
486        for child_index in self.maybe_shutdown.drain(..) {
487            let child_state = self.pointstamp_tracker.node_state(child_index);
488            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
489            let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
490            if frontiers_empty && no_capabilities {
491                self.temp_active.push(Reverse(child_index));
492            }
493        }
494
495        // Extract child zero frontier changes and report as internal capability changes.
496        for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
497            self.pointstamp_tracker
498                .pushed_output()[output]
499                .drain()
500                .map(|(time, diff)| (time.to_outer(), diff))
501                .filter_through(&mut self.output_capabilities[output])
502                .for_each(|(time, diff)| internal.update(time, diff));
503        }
504    }
505
506    /// Sends local progress updates to all workers.
507    ///
508    /// This method does not guarantee that all of `self.local_pointstamps` are
509    /// sent, but that no blocking pointstamps remain
510    fn send_progress(&mut self) {
511
512        // If we are requested to eagerly send progress updates, or if there are
513        // updates visible in the scope-wide frontier, we must send all updates.
514        let must_send = self.progress_mode == ProgressMode::Eager || {
515            let tracker = &mut self.pointstamp_tracker;
516            self.local_pointstamp
517                .iter()
518                .any(|((location, time), diff)|
519                    // Must publish scope-wide visible subtractions.
520                    tracker.is_global(*location, time) && *diff < 0
521                )
522        };
523
524        if must_send {
525            self.progcaster.send(&mut self.local_pointstamp);
526        }
527    }
528}
529
530
531impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
532where
533    TOuter: Timestamp,
534    TInner: Timestamp+Refines<TOuter>,
535{
536    fn local(&self) -> bool { false }
537    fn inputs(&self)  -> usize { self.inputs }
538    fn outputs(&self) -> usize { self.outputs }
539
540    // produces connectivity summaries from inputs to outputs, and reports initial internal
541    // capabilities on each of the outputs (projecting capabilities from contained scopes).
542    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
543
544        // double-check that child 0 (the outside world) is correctly shaped.
545        assert_eq!(self.children[0].outputs, self.inputs());
546        assert_eq!(self.children[0].inputs, self.outputs());
547
548        // Note that we need to have `self.inputs()` elements in the summary
549        // with each element containing `self.outputs()` antichains regardless
550        // of how long `self.scope_summary` is
551        let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
552        for (input_idx, input) in self.scope_summary.iter().enumerate() {
553            for (output_idx, output) in input.iter().enumerate() {
554                let antichain = &mut internal_summary[input_idx][output_idx];
555                antichain.reserve(output.elements().len());
556                antichain.extend(output.elements().iter().cloned().map(TInner::summarize));
557            }
558        }
559
560        debug_assert_eq!(
561            internal_summary.len(),
562            self.inputs(),
563            "the internal summary should have as many elements as there are inputs",
564        );
565        debug_assert!(
566            internal_summary.iter().all(|summary| summary.len() == self.outputs()),
567            "each element of the internal summary should have as many elements as there are outputs",
568        );
569
570        // Each child has expressed initial capabilities (their `shared_progress.internals`).
571        // We introduce these into the progress tracker to determine the scope's initial
572        // internal capabilities.
573        for child in self.children.iter_mut() {
574            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
575        }
576
577        self.propagate_pointstamps();  // Propagate expressed capabilities to output frontiers.
578
579        // Return summaries and shared progress information.
580        (internal_summary, self.shared_progress.clone())
581    }
582
583    fn set_external_summary(&mut self) {
584        self.accept_frontier();
585        self.propagate_pointstamps();  // ensure propagation of input frontiers.
586        self.children
587            .iter_mut()
588            .flat_map(|child| child.operator.as_mut())
589            .for_each(|op| op.set_external_summary());
590    }
591}
592
593struct PerOperatorState<T: Timestamp> {
594
595    name: String,       // name of the operator
596    index: usize,       // index of the operator within its parent scope
597    id: usize,          // worker-unique identifier
598
599    local: bool,        // indicates whether the operator will exchange data or not
600    notify: bool,
601    inputs: usize,      // number of inputs to the operator
602    outputs: usize,     // number of outputs from the operator
603
604    operator: Option<Box<dyn Operate<T>>>,
605
606    edges: Vec<Vec<Target>>,    // edges from the outputs of the operator
607
608    shared_progress: Rc<RefCell<SharedProgress<T>>>,
609
610    internal_summary: Vec<Vec<Antichain<T::Summary>>>,   // cached result from get_internal_summary.
611
612    logging: Option<Logger>,
613}
614
615impl<T: Timestamp> PerOperatorState<T> {
616
617    fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
618        PerOperatorState {
619            name:       "External".to_owned(),
620            operator:   None,
621            index:      0,
622            id:         usize::max_value(),
623            local:      false,
624            notify:     true,
625            inputs,
626            outputs,
627
628            edges: vec![Vec::new(); outputs],
629
630            logging: None,
631
632            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
633            internal_summary: Vec::new(),
634        }
635    }
636
637    pub fn new(
638        mut scope: Box<dyn Operate<T>>,
639        index: usize,
640        mut _path: Vec<usize>,
641        identifier: usize,
642        logging: Option<Logger>
643    ) -> PerOperatorState<T>
644    {
645        let local = scope.local();
646        let inputs = scope.inputs();
647        let outputs = scope.outputs();
648        let notify = scope.notify_me();
649
650        let (internal_summary, shared_progress) = scope.get_internal_summary();
651
652        assert_eq!(
653            internal_summary.len(),
654            inputs,
655            "operator summary has {} inputs when {} were expected",
656            internal_summary.len(),
657            inputs,
658        );
659        assert!(
660            !internal_summary.iter().any(|x| x.len() != outputs),
661            "operator summary had too few outputs",
662        );
663
664        PerOperatorState {
665            name:               scope.name().to_owned(),
666            operator:           Some(scope),
667            index,
668            id:                 identifier,
669            local,
670            notify,
671            inputs,
672            outputs,
673            edges:              vec![vec![]; outputs],
674
675            logging,
676
677            shared_progress,
678            internal_summary,
679        }
680    }
681
682    pub fn schedule(&mut self) -> bool {
683
684        if let Some(ref mut operator) = self.operator {
685
686            // Perhaps log information about the start of the schedule call.
687            if let Some(l) = self.logging.as_mut() {
688                // FIXME: There is no contract that the operator must consume frontier changes.
689                //        This report could be spurious.
690                // TODO:  Perhaps fold this in to `ScheduleEvent::start()` as a "reason"?
691                let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
692                if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
693                    l.log(crate::logging::PushProgressEvent { op_id: self.id })
694                }
695
696                l.log(crate::logging::ScheduleEvent::start(self.id));
697            }
698
699            let incomplete = operator.schedule();
700
701            // Perhaps log information about the stop of the schedule call.
702            if let Some(l) = self.logging.as_mut() {
703                l.log(crate::logging::ScheduleEvent::stop(self.id));
704            }
705
706            incomplete
707        }
708        else {
709
710            // If the operator is closed and we are reporting progress at it, something has surely gone wrong.
711            if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
712                println!("Operator prematurely shut down: {}", self.name);
713                println!("  {:?}", self.notify);
714                println!("  {:?}", self.shared_progress.borrow_mut().frontiers);
715                panic!();
716            }
717
718            // A closed operator shouldn't keep anything open.
719            false
720        }
721    }
722
723    fn shut_down(&mut self) {
724        if self.operator.is_some() {
725            if let Some(l) = self.logging.as_mut() {
726                l.log(crate::logging::ShutdownEvent{ id: self.id });
727            }
728            self.operator = None;
729        }
730    }
731
732    /// Extracts shared progress information and converts to pointstamp changes.
733    fn extract_progress(&mut self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
734
735        let shared_progress = &mut *self.shared_progress.borrow_mut();
736
737        // Migrate consumeds, internals, produceds into progress statements.
738        for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
739            let target = Location::new_target(self.index, input);
740            for (time, delta) in consumed.drain() {
741                pointstamps.update((target, time), -delta);
742            }
743        }
744        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
745            let source = Location::new_source(self.index, output);
746            for (time, delta) in internal.drain() {
747                pointstamps.update((source, time.clone()), delta);
748            }
749        }
750        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
751            for (time, delta) in produced.drain() {
752                for target in &self.edges[output] {
753                    pointstamps.update((Location::from(*target), time.clone()), delta);
754                    temp_active.push(Reverse(target.node));
755                }
756            }
757        }
758    }
759
760    /// Test the validity of `self.shared_progress`.
761    ///
762    /// The validity of shared progress information depends on both the external frontiers and the
763    /// internal capabilities, as events can occur that cannot be explained locally otherwise.
764    #[allow(dead_code)]
765    fn validate_progress(&mut self, child_state: &reachability::PerOperator<T>) {
766
767        let shared_progress = &mut *self.shared_progress.borrow_mut();
768
769        // Increments to internal capabilities require a consumed input message, a
770        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
771            for (time, diff) in internal.iter() {
772                if *diff > 0 {
773                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
774                    let internal = child_state.sources[output].implications.less_equal(time);
775                    if !consumed && !internal {
776                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
777                        panic!("Progress error; internal {:?}", self.name);
778                    }
779                }
780            }
781        }
782        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
783            for (time, diff) in produced.iter() {
784                if *diff > 0 {
785                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
786                    let internal = child_state.sources[output].implications.less_equal(time);
787                    if !consumed && !internal {
788                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
789                        panic!("Progress error; produced {:?}", self.name);
790                    }
791                }
792            }
793        }
794    }
795}
796
797// Explicitly shut down the operator to get logged information.
798impl<T: Timestamp> Drop for PerOperatorState<T> {
799    fn drop(&mut self) {
800        self.shut_down();
801    }
802}