timely/progress/
reachability.rs

1//! Manages pointstamp reachability within a timely dataflow graph.
2//!
3//! Timely dataflow is concerned with understanding and communicating the potential
4//! for capabilities to reach nodes in a directed graph, by following paths through
5//! the graph (along edges and through nodes). This module contains one abstraction
6//! for managing this information.
7//!
8//! # Examples
9//!
10//! ```rust
11//! use timely::progress::{Location, Port};
12//! use timely::progress::frontier::Antichain;
13//! use timely::progress::{Source, Target};
14//! use timely::progress::reachability::{Builder, Tracker};
15//!
16//! // allocate a new empty topology builder.
17//! let mut builder = Builder::<usize>::new();
18//!
19//! // Each node with one input connected to one output.
20//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
21//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
22//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
23//!
24//! // Connect nodes in sequence, looping around to the first from the last.
25//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
26//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
27//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
28//!
29//! // Construct a reachability tracker.
30//! let (mut tracker, _) = builder.build(None);
31//!
32//! // Introduce a pointstamp at the output of the first node.
33//! tracker.update_source(Source::new(0, 0), 17, 1);
34//!
35//! // Propagate changes; until this call updates are simply buffered.
36//! tracker.propagate_all();
37//!
38//! let mut results =
39//! tracker
40//!     .pushed()
41//!     .drain()
42//!     .filter(|((location, time), delta)| location.is_target())
43//!     .collect::<Vec<_>>();
44//!
45//! results.sort();
46//!
47//! println!("{:?}", results);
48//!
49//! assert_eq!(results.len(), 3);
50//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), 1));
51//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), 1));
52//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
53//!
54//! // Introduce a pointstamp at the output of the first node.
55//! tracker.update_source(Source::new(0, 0), 17, -1);
56//!
57//! // Propagate changes; until this call updates are simply buffered.
58//! tracker.propagate_all();
59//!
60//! let mut results =
61//! tracker
62//!     .pushed()
63//!     .drain()
64//!     .filter(|((location, time), delta)| location.is_target())
65//!     .collect::<Vec<_>>();
66//!
67//! results.sort();
68//!
69//! assert_eq!(results.len(), 3);
70//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), -1));
71//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), -1));
72//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), -1));
73//! ```
74
75use std::collections::{BinaryHeap, HashMap, VecDeque};
76use std::cmp::Reverse;
77
78use crate::progress::Timestamp;
79use crate::progress::{Source, Target};
80use crate::progress::ChangeBatch;
81use crate::progress::{Location, Port};
82
83use crate::progress::frontier::{Antichain, MutableAntichain};
84use crate::progress::timestamp::PathSummary;
85
86
87/// A topology builder, which can summarize reachability along paths.
88///
89/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
90/// a static summary of the minimal actions a timestamp must endure going from any
91/// input or output port to a destination input port.
92///
93/// A graph is provides as (i) several indexed nodes, each with some number of input
94/// and output ports, and each with a summary of the internal paths connecting each
95/// input to each output, and (ii) a set of edges connecting output ports to input
96/// ports. Edges do not adjust timestamps; only nodes do this.
97///
98/// The resulting summary describes, for each origin port in the graph and destination
99/// input port, a set of incomparable path summaries, each describing what happens to
100/// a timestamp as it moves along the path. There may be multiple summaries for each
101/// part of origin and destination due to the fact that the actions on timestamps may
102/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
103/// the timestamp and seven").
104///
105/// # Examples
106///
107/// ```rust
108/// use timely::progress::frontier::Antichain;
109/// use timely::progress::{Source, Target};
110/// use timely::progress::reachability::Builder;
111///
112/// // allocate a new empty topology builder.
113/// let mut builder = Builder::<usize>::new();
114///
115/// // Each node with one input connected to one output.
116/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
117/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
118/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
119///
120/// // Connect nodes in sequence, looping around to the first from the last.
121/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
122/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
123/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
124///
125/// // Summarize reachability information.
126/// let (tracker, _) = builder.build(None);
127/// ```
128#[derive(Clone, Debug)]
129pub struct Builder<T: Timestamp> {
130    /// Internal connections within hosted operators.
131    ///
132    /// Indexed by operator index, then input port, then output port. This is the
133    /// same format returned by `get_internal_summary`, as if we simply appended
134    /// all of the summaries for the hosted nodes.
135    pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
136    /// Direct connections from sources to targets.
137    ///
138    /// Edges do not affect timestamps, so we only need to know the connectivity.
139    /// Indexed by operator index then output port.
140    pub edges: Vec<Vec<Vec<Target>>>,
141    /// Numbers of inputs and outputs for each node.
142    pub shape: Vec<(usize, usize)>,
143}
144
145impl<T: Timestamp> Builder<T> {
146
147    /// Create a new empty topology builder.
148    pub fn new() -> Self {
149        Builder {
150            nodes: Vec::new(),
151            edges: Vec::new(),
152            shape: Vec::new(),
153        }
154    }
155
156    /// Add links internal to operators.
157    ///
158    /// This method overwrites any existing summary, instead of anything more sophisticated.
159    pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
160
161        // Assert that all summaries exist.
162        debug_assert_eq!(inputs, summary.len());
163        for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
164
165        while self.nodes.len() <= index {
166            self.nodes.push(Vec::new());
167            self.edges.push(Vec::new());
168            self.shape.push((0, 0));
169        }
170
171        self.nodes[index] = summary;
172        if self.edges[index].len() != outputs {
173            self.edges[index] = vec![Vec::new(); outputs];
174        }
175        self.shape[index] = (inputs, outputs);
176    }
177
178    /// Add links between operators.
179    ///
180    /// This method does not check that the associated nodes and ports exist. References to
181    /// missing nodes or ports are discovered in `build`.
182    pub fn add_edge(&mut self, source: Source, target: Target) {
183
184        // Assert that the edge is between existing ports.
185        debug_assert!(source.port < self.shape[source.node].1);
186        debug_assert!(target.port < self.shape[target.node].0);
187
188        self.edges[source.node][source.port].push(target);
189    }
190
191    /// Compiles the current nodes and edges into immutable path summaries.
192    ///
193    /// This method has the opportunity to perform some error checking that the path summaries
194    /// are valid, including references to undefined nodes and ports, as well as self-loops with
195    /// default summaries (a serious liveness issue).
196    ///
197    /// The optional logger information is baked into the resulting tracker.
198    pub fn build(self, logger: Option<logging::TrackerLogger>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
199
200        if !self.is_acyclic() {
201            println!("Cycle detected without timestamp increment");
202            println!("{:?}", self);
203        }
204
205        Tracker::allocate_from(self, logger)
206    }
207
208    /// Tests whether the graph a cycle of default path summaries.
209    ///
210    /// Graphs containing cycles of default path summaries will most likely
211    /// not work well with progress tracking, as a timestamp can result in
212    /// itself. Such computations can still *run*, but one should not block
213    /// on frontier information before yielding results, as you many never
214    /// unblock.
215    ///
216    /// # Examples
217    ///
218    /// ```rust
219    /// use timely::progress::frontier::Antichain;
220    /// use timely::progress::{Source, Target};
221    /// use timely::progress::reachability::Builder;
222    ///
223    /// // allocate a new empty topology builder.
224    /// let mut builder = Builder::<usize>::new();
225    ///
226    /// // Each node with one input connected to one output.
227    /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
228    /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
229    /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]);
230    ///
231    /// // Connect nodes in sequence, looping around to the first from the last.
232    /// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
233    /// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
234    ///
235    /// assert!(builder.is_acyclic());
236    ///
237    /// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
238    ///
239    /// assert!(!builder.is_acyclic());
240    /// ```
241    ///
242    /// This test exists because it is possible to describe dataflow graphs that
243    /// do not contain non-incrementing cycles, but without feedback nodes that
244    /// strictly increment timestamps. For example,
245    ///
246    /// ```rust
247    /// use timely::progress::frontier::Antichain;
248    /// use timely::progress::{Source, Target};
249    /// use timely::progress::reachability::Builder;
250    ///
251    /// // allocate a new empty topology builder.
252    /// let mut builder = Builder::<usize>::new();
253    ///
254    /// // Two inputs and outputs, only one of which advances.
255    /// builder.add_node(0, 2, 2, vec![
256    ///     vec![Antichain::from_elem(0),Antichain::new(),],
257    ///     vec![Antichain::new(),Antichain::from_elem(1),],
258    /// ]);
259    ///
260    /// // Connect each output to the opposite input.
261    /// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
262    /// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
263    ///
264    /// assert!(builder.is_acyclic());
265    /// ```
266    pub fn is_acyclic(&self) -> bool {
267
268        let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
269        let mut in_degree = HashMap::with_capacity(locations);
270
271        // Load edges as default summaries.
272        for (index, ports) in self.edges.iter().enumerate() {
273            for (output, targets) in ports.iter().enumerate() {
274                let source = Location::new_source(index, output);
275                in_degree.entry(source).or_insert(0);
276                for &target in targets.iter() {
277                    let target = Location::from(target);
278                    *in_degree.entry(target).or_insert(0) += 1;
279                }
280            }
281        }
282
283        // Load default intra-node summaries.
284        for (index, summary) in self.nodes.iter().enumerate() {
285            for (input, outputs) in summary.iter().enumerate() {
286                let target = Location::new_target(index, input);
287                in_degree.entry(target).or_insert(0);
288                for (output, summaries) in outputs.iter().enumerate() {
289                    let source = Location::new_source(index, output);
290                    for summary in summaries.elements().iter() {
291                        if summary == &Default::default() {
292                            *in_degree.entry(source).or_insert(0) += 1;
293                        }
294                    }
295                }
296            }
297        }
298
299        // A worklist of nodes that cannot be reached from the whole graph.
300        // Initially this list contains observed locations with no incoming
301        // edges, but as the algorithm develops we add to it any locations
302        // that can only be reached by nodes that have been on this list.
303        let mut worklist = Vec::with_capacity(in_degree.len());
304        for (key, val) in in_degree.iter() {
305            if *val == 0 {
306                worklist.push(*key);
307            }
308        }
309        in_degree.retain(|_key, val| val != &0);
310
311        // Repeatedly remove nodes and update adjacent in-edges.
312        while let Some(Location { node, port }) = worklist.pop() {
313            match port {
314                Port::Source(port) => {
315                    for target in self.edges[node][port].iter() {
316                        let target = Location::from(*target);
317                        *in_degree.get_mut(&target).unwrap() -= 1;
318                        if in_degree[&target] == 0 {
319                            in_degree.remove(&target);
320                            worklist.push(target);
321                        }
322                    }
323                },
324                Port::Target(port) => {
325                    for (output, summaries) in self.nodes[node][port].iter().enumerate() {
326                        let source = Location::new_source(node, output);
327                        for summary in summaries.elements().iter() {
328                            if summary == &Default::default() {
329                                *in_degree.get_mut(&source).unwrap() -= 1;
330                                if in_degree[&source] == 0 {
331                                    in_degree.remove(&source);
332                                    worklist.push(source);
333                                }
334                            }
335                        }
336                    }
337                },
338            }
339        }
340
341        // Acyclic graphs should reduce to empty collections.
342        in_degree.is_empty()
343    }
344}
345
346impl<T: Timestamp> Default for Builder<T> {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352/// An interactive tracker of propagated reachability information.
353///
354/// A `Tracker` tracks, for a fixed graph topology, the implications of
355/// pointstamp changes at various node input and output ports. These changes may
356/// alter the potential pointstamps that could arrive at downstream input ports.
357pub struct Tracker<T:Timestamp> {
358
359    /// Internal connections within hosted operators.
360    ///
361    /// Indexed by operator index, then input port, then output port. This is the
362    /// same format returned by `get_internal_summary`, as if we simply appended
363    /// all of the summaries for the hosted nodes.
364    nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
365    /// Direct connections from sources to targets.
366    ///
367    /// Edges do not affect timestamps, so we only need to know the connectivity.
368    /// Indexed by operator index then output port.
369    edges: Vec<Vec<Vec<Target>>>,
370
371    // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
372    //       It seems we should be able to flatten most of these so that there are a few allocations
373    //       independent of the numbers of nodes and ports and such.
374    //
375    // TODO: We could also change the internal representation to be a graph of targets, using usize
376    //       identifiers for each, so that internally we needn't use multiple levels of indirection.
377    //       This may make more sense once we commit to topologically ordering the targets.
378
379    /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
380    /// rather than their multiplicities. We separately track the frontiers resulting from propagated
381    /// frontiers, to protect them from transient negativity in inbound target updates.
382    per_operator: Vec<PerOperator<T>>,
383
384    /// Source and target changes are buffered, which allows us to delay processing until propagation,
385    /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
386    target_changes: ChangeBatch<(Target, T)>,
387    source_changes: ChangeBatch<(Source, T)>,
388
389    /// Worklist of updates to perform, ordered by increasing timestamp and target.
390    worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
391
392    /// Buffer of consequent changes.
393    pushed_changes: ChangeBatch<(Location, T)>,
394
395    /// Compiled summaries from each internal location (not scope inputs) to each scope output.
396    output_changes: Vec<ChangeBatch<T>>,
397
398    /// A non-negative sum of post-filtration input changes.
399    ///
400    /// This sum should be zero exactly when the accumulated input changes are zero,
401    /// indicating that the progress tracker is currently tracking nothing. It should
402    /// always be exactly equal to the sum across all operators of the frontier sizes
403    /// of the target and source `pointstamps` member.
404    total_counts: i64,
405
406    /// Optionally, a unique logging identifier and logging for tracking events.
407    logger: Option<logging::TrackerLogger>,
408}
409
410/// Target and source information for each operator.
411pub struct PerOperator<T: Timestamp> {
412    /// Port information for each target.
413    pub targets: Vec<PortInformation<T>>,
414    /// Port information for each source.
415    pub sources: Vec<PortInformation<T>>,
416}
417
418impl<T: Timestamp> PerOperator<T> {
419    /// A new PerOperator bundle from numbers of input and output ports.
420    pub fn new(inputs: usize, outputs: usize) -> Self {
421        PerOperator {
422            targets: vec![PortInformation::new(); inputs],
423            sources: vec![PortInformation::new(); outputs],
424        }
425    }
426}
427
428/// Per-port progress-tracking information.
429#[derive(Clone)]
430pub struct PortInformation<T: Timestamp> {
431    /// Current counts of active pointstamps.
432    pub pointstamps: MutableAntichain<T>,
433    /// Current implications of active pointstamps across the dataflow.
434    pub implications: MutableAntichain<T>,
435    /// Path summaries to each of the scope outputs.
436    pub output_summaries: Vec<Antichain<T::Summary>>,
437}
438
439impl<T: Timestamp> PortInformation<T> {
440    /// Creates empty port information.
441    pub fn new() -> Self {
442        PortInformation {
443            pointstamps: MutableAntichain::new(),
444            implications: MutableAntichain::new(),
445            output_summaries: Vec::new(),
446        }
447    }
448
449    /// True if updates at this pointstamp uniquely block progress.
450    ///
451    /// This method returns true if the currently maintained pointstamp
452    /// counts are such that zeroing out outstanding updates at *this*
453    /// pointstamp would change the frontiers at this operator. When the
454    /// method returns false it means that, temporarily at least, there
455    /// are outstanding pointstamp updates that are strictly less than
456    /// this pointstamp.
457    #[inline]
458    pub fn is_global(&self, time: &T) -> bool {
459        let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
460        let redundant = self.implications.count_for(time) > 1;
461        !dominated && !redundant
462    }
463}
464
465impl<T: Timestamp> Default for PortInformation<T> {
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471impl<T:Timestamp> Tracker<T> {
472
473    /// Updates the count for a time at a location.
474    #[inline]
475    pub fn update(&mut self, location: Location, time: T, value: i64) {
476        match location.port {
477            Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
478            Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
479        };
480    }
481
482    /// Updates the count for a time at a target (operator input, scope output).
483    #[inline]
484    pub fn update_target(&mut self, target: Target, time: T, value: i64) {
485        self.target_changes.update((target, time), value);
486    }
487    /// Updates the count for a time at a source (operator output, scope input).
488    #[inline]
489    pub fn update_source(&mut self, source: Source, time: T, value: i64) {
490        self.source_changes.update((source, time), value);
491    }
492
493    /// Indicates if any pointstamps have positive count.
494    pub fn tracking_anything(&mut self) -> bool {
495        !self.source_changes.is_empty() ||
496        !self.target_changes.is_empty() ||
497        self.total_counts > 0
498    }
499
500    /// Allocate a new `Tracker` using the shape from `summaries`.
501    ///
502    /// The result is a pair of tracker, and the summaries from each input port to each
503    /// output port.
504    ///
505    /// If the optional logger is provided, it will be used to log various tracker events.
506    pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
507
508        // Allocate buffer space for each input and input port.
509        let mut per_operator =
510        builder
511            .shape
512            .iter()
513            .map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
514            .collect::<Vec<_>>();
515
516        // Summary of scope inputs to scope outputs.
517        let mut builder_summary = vec![vec![]; builder.shape[0].1];
518
519        // Compile summaries from each location to each scope output.
520        let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
521        for (location, summaries) in output_summaries.into_iter() {
522            // Summaries from scope inputs are useful in summarizing the scope.
523            if location.node == 0 {
524                if let Port::Source(port) = location.port {
525                    builder_summary[port] = summaries;
526                }
527                else {
528                    // Ignore (ideally trivial) output to output summaries.
529                }
530            }
531            // Summaries from internal nodes are important for projecting capabilities.
532            else {
533                match location.port {
534                    Port::Target(port) => {
535                        per_operator[location.node].targets[port].output_summaries = summaries;
536                    },
537                    Port::Source(port) => {
538                        per_operator[location.node].sources[port].output_summaries = summaries;
539                    },
540                }
541            }
542        }
543
544        let scope_outputs = builder.shape[0].0;
545        let output_changes = vec![ChangeBatch::new(); scope_outputs];
546
547        let tracker =
548        Tracker {
549            nodes: builder.nodes,
550            edges: builder.edges,
551            per_operator,
552            target_changes: ChangeBatch::new(),
553            source_changes: ChangeBatch::new(),
554            worklist: BinaryHeap::new(),
555            pushed_changes: ChangeBatch::new(),
556            output_changes,
557            total_counts: 0,
558            logger,
559        };
560
561        (tracker, builder_summary)
562    }
563
564    /// Propagates all pending updates.
565    ///
566    /// The method drains `self.input_changes` and circulates their implications
567    /// until we cease deriving new implications.
568    pub fn propagate_all(&mut self) {
569
570        // Step 0: If logging is enabled, construct and log inbound changes.
571        if let Some(logger) = &mut self.logger {
572
573            let target_changes =
574            self.target_changes
575                .iter()
576                .map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff))
577                .collect::<Vec<_>>();
578
579            if !target_changes.is_empty() {
580                logger.log_target_updates(Box::new(target_changes));
581            }
582
583            let source_changes =
584            self.source_changes
585                .iter()
586                .map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff))
587                .collect::<Vec<_>>();
588
589            if !source_changes.is_empty() {
590                logger.log_source_updates(Box::new(source_changes));
591            }
592        }
593
594        // Step 1: Drain `self.input_changes` and determine actual frontier changes.
595        //
596        // Not all changes in `self.input_changes` may alter the frontier at a location.
597        // By filtering the changes through `self.pointstamps` we react only to discrete
598        // changes in the frontier, rather than changes in the pointstamp counts that
599        // witness that frontier.
600        for ((target, time), diff) in self.target_changes.drain() {
601
602            let operator = &mut self.per_operator[target.node].targets[target.port];
603            let changes = operator.pointstamps.update_iter(Some((time, diff)));
604
605            for (time, diff) in changes {
606                self.total_counts += diff;
607                for (output, summaries) in operator.output_summaries.iter().enumerate() {
608                    let output_changes = &mut self.output_changes[output];
609                    summaries
610                        .elements()
611                        .iter()
612                        .flat_map(|summary| summary.results_in(&time))
613                        .for_each(|out_time| output_changes.update(out_time, diff));
614                }
615                self.worklist.push(Reverse((time, Location::from(target), diff)));
616            }
617        }
618
619        for ((source, time), diff) in self.source_changes.drain() {
620
621            let operator = &mut self.per_operator[source.node].sources[source.port];
622            let changes = operator.pointstamps.update_iter(Some((time, diff)));
623
624            for (time, diff) in changes {
625                self.total_counts += diff;
626                for (output, summaries) in operator.output_summaries.iter().enumerate() {
627                    let output_changes = &mut self.output_changes[output];
628                    summaries
629                        .elements()
630                        .iter()
631                        .flat_map(|summary| summary.results_in(&time))
632                        .for_each(|out_time| output_changes.update(out_time, diff));
633                }
634                self.worklist.push(Reverse((time, Location::from(source), diff)));
635            }
636        }
637
638        // Step 2: Circulate implications of changes to `self.pointstamps`.
639        //
640        // TODO: The argument that this always terminates is subtle, and should be made.
641        //       The intent is that that by moving forward in layers through `time`, we
642        //       will discover zero-change times when we first visit them, as no further
643        //       changes can be made to them once we complete them.
644        while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
645
646            // Drain and accumulate all updates that have the same time and location.
647            while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
648                diff += (self.worklist.pop().unwrap().0).2;
649            }
650
651            // Only act if there is a net change, positive or negative.
652            if diff != 0 {
653
654                match location.port {
655                    // Update to an operator input.
656                    // Propagate any changes forward across the operator.
657                    Port::Target(port_index) => {
658
659                        let changes =
660                        self.per_operator[location.node]
661                            .targets[port_index]
662                            .implications
663                            .update_iter(Some((time, diff)));
664
665                        for (time, diff) in changes {
666                            let nodes = &self.nodes[location.node][port_index];
667                            for (output_port, summaries) in nodes.iter().enumerate() {
668                                let source = Location { node: location.node, port: Port::Source(output_port) };
669                                for summary in summaries.elements().iter() {
670                                    if let Some(new_time) = summary.results_in(&time) {
671                                        self.worklist.push(Reverse((new_time, source, diff)));
672                                    }
673                                }
674                            }
675                            self.pushed_changes.update((location, time), diff);
676                        }
677                    }
678                    // Update to an operator output.
679                    // Propagate any changes forward along outgoing edges.
680                    Port::Source(port_index) => {
681
682                        let changes =
683                        self.per_operator[location.node]
684                            .sources[port_index]
685                            .implications
686                            .update_iter(Some((time, diff)));
687
688                        for (time, diff) in changes {
689                            for new_target in self.edges[location.node][port_index].iter() {
690                                self.worklist.push(Reverse((
691                                    time.clone(),
692                                    Location::from(*new_target),
693                                    diff,
694                                )));
695                            }
696                            self.pushed_changes.update((location, time), diff);
697                        }
698                    },
699                };
700            }
701        }
702    }
703
704    /// Implications of maintained capabilities projected to each output.
705    pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
706        &mut self.output_changes[..]
707    }
708
709    /// A mutable reference to the pushed results of changes.
710    pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
711        &mut self.pushed_changes
712    }
713
714    /// Reveals per-operator frontier state.
715    pub fn node_state(&self, index: usize) -> &PerOperator<T> {
716        &self.per_operator[index]
717    }
718
719    /// Indicates if pointstamp is in the scope-wide frontier.
720    ///
721    /// Such a pointstamp would, if removed from `self.pointstamps`, cause a change
722    /// to `self.implications`, which is what we track for per operator input frontiers.
723    /// If the above do not hold, then its removal either 1. shouldn't be possible,
724    /// or 2. will not affect the output of `self.implications`.
725    pub fn is_global(&self, location: Location, time: &T) -> bool {
726        match location.port {
727            Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
728            Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
729        }
730    }
731}
732
733/// Determines summaries from locations to scope outputs.
734///
735/// Specifically, for each location whose node identifier is non-zero, we compile
736/// the summaries along which they can reach each output.
737///
738/// Graph locations may be missing from the output, in which case they have no
739/// paths to scope outputs.
740fn summarize_outputs<T: Timestamp>(
741    nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
742    edges: &Vec<Vec<Vec<Target>>>,
743    ) -> HashMap<Location, Vec<Antichain<T::Summary>>>
744{
745    // A reverse edge map, to allow us to walk back up the dataflow graph.
746    let mut reverse = HashMap::new();
747    for (node, outputs) in edges.iter().enumerate() {
748        for (output, targets) in outputs.iter().enumerate() {
749            for target in targets.iter() {
750                reverse.insert(
751                    Location::from(*target),
752                    Location { node, port: Port::Source(output) }
753                );
754            }
755        }
756    }
757
758    let mut results: HashMap<Location, Vec<Antichain<T::Summary>>> = HashMap::new();
759    let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
760
761    let outputs =
762    edges
763        .iter()
764        .flat_map(|x| x.iter())
765        .flat_map(|x| x.iter())
766        .filter(|target| target.node == 0);
767
768    // The scope may have no outputs, in which case we can do no work.
769    for output_target in outputs {
770        worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
771    }
772
773    // Loop until we stop discovering novel reachability paths.
774    while let Some((location, output, summary)) = worklist.pop_front() {
775
776        match location.port {
777
778            // This is an output port of an operator, or a scope input.
779            // We want to crawl up the operator, to its inputs.
780            Port::Source(output_port) => {
781
782                // Consider each input port of the associated operator.
783                for (input_port, summaries) in nodes[location.node].iter().enumerate() {
784
785                    // Determine the current path summaries from the input port.
786                    let location = Location { node: location.node, port: Port::Target(input_port) };
787                    let antichains = results
788                        .entry(location)
789                        .and_modify(|antichains| antichains.reserve(output))
790                        .or_insert_with(|| Vec::with_capacity(output));
791
792                    while antichains.len() <= output { antichains.push(Antichain::new()); }
793
794                    // Combine each operator-internal summary to the output with `summary`.
795                    for operator_summary in summaries[output_port].elements().iter() {
796                        if let Some(combined) = operator_summary.followed_by(&summary) {
797                            if antichains[output].insert(combined.clone()) {
798                                worklist.push_back((location, output, combined));
799                            }
800                        }
801                    }
802                }
803
804            },
805
806            // This is an input port of an operator, or a scope output.
807            // We want to walk back the edges leading to it.
808            Port::Target(_port) => {
809
810                // Each target should have (at most) one source.
811                if let Some(&source) = reverse.get(&location) {
812                    let antichains = results
813                        .entry(source)
814                        .and_modify(|antichains| antichains.reserve(output))
815                        .or_insert_with(|| Vec::with_capacity(output));
816
817                    while antichains.len() <= output { antichains.push(Antichain::new()); }
818
819                    if antichains[output].insert(summary.clone()) {
820                        worklist.push_back((source, output, summary.clone()));
821                    }
822                }
823
824            },
825        }
826
827    }
828
829    results
830}
831
832/// Logging types for reachability tracking events.
833pub mod logging {
834
835    use crate::logging::{Logger, ProgressEventTimestampVec};
836
837    /// A logger with additional identifying information about the tracker.
838    pub struct TrackerLogger {
839        path: Vec<usize>,
840        logger: Logger<TrackerEvent>,
841    }
842
843    impl TrackerLogger {
844        /// Create a new tracker logger from its fields.
845        pub fn new(path: Vec<usize>, logger: Logger<TrackerEvent>) -> Self {
846            Self { path, logger }
847        }
848
849        /// Log source update events with additional identifying information.
850        pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
851            self.logger.log({
852                SourceUpdate {
853                    tracker_id: self.path.clone(),
854                    updates,
855                }
856            })
857        }
858        /// Log target update events with additional identifying information.
859        pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
860            self.logger.log({
861                TargetUpdate {
862                    tracker_id: self.path.clone(),
863                    updates,
864                }
865            })
866        }
867    }
868
869    /// Events that the tracker may record.
870    pub enum TrackerEvent {
871        /// Updates made at a source of data.
872        SourceUpdate(SourceUpdate),
873        /// Updates made at a target of data.
874        TargetUpdate(TargetUpdate),
875    }
876
877    /// An update made at a source of data.
878    pub struct SourceUpdate {
879        /// An identifier for the tracker.
880        pub tracker_id: Vec<usize>,
881        /// Updates themselves, as `(node, port, time, diff)`.
882        pub updates: Box<dyn ProgressEventTimestampVec>,
883    }
884
885    /// An update made at a target of data.
886    pub struct TargetUpdate {
887        /// An identifier for the tracker.
888        pub tracker_id: Vec<usize>,
889        /// Updates themselves, as `(node, port, time, diff)`.
890        pub updates: Box<dyn ProgressEventTimestampVec>,
891    }
892
893    impl From<SourceUpdate> for TrackerEvent {
894        fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
895    }
896
897    impl From<TargetUpdate> for TrackerEvent {
898        fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
899    }
900}
901
902// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
903// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
904// because in all other cases the tracker stays alive while it has outstanding work, leaving no
905// remaining work for this Drop implementation.
906impl<T: Timestamp> Drop for Tracker<T> {
907    fn drop(&mut self) {
908        let logger = if let Some(logger) = &mut self.logger {
909            logger
910        } else {
911            // No cleanup necessary when there is no logger.
912            return;
913        };
914
915        // Retract pending data that `propagate_all` would normally log.
916        for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
917            let target_changes = per_operator.targets
918                .iter_mut()
919                .enumerate()
920                .flat_map(|(port, target)| {
921                    target.pointstamps
922                        .updates()
923                        .map(move |(time, diff)| (index, port, time.clone(), -diff))
924                })
925                .collect::<Vec<_>>();
926            if !target_changes.is_empty() {
927                logger.log_target_updates(Box::new(target_changes));
928            }
929
930            let source_changes = per_operator.sources
931                .iter_mut()
932                .enumerate()
933                .flat_map(|(port, source)| {
934                    source.pointstamps
935                        .updates()
936                        .map(move |(time, diff)| (index, port, time.clone(), -diff))
937                })
938                .collect::<Vec<_>>();
939            if !source_changes.is_empty() {
940                logger.log_source_updates(Box::new(source_changes));
941            }
942        }
943    }
944}