Skip to main content

differential_dataflow/dynamic/
mod.rs

1//! Types and operators for dynamically scoped iterative dataflows.
2//!
3//! Scopes in timely dataflow are expressed statically, as part of the type system.
4//! This affords many efficiencies, as well as type-driven reassurance of correctness.
5//! However, there are times you need scopes whose organization is discovered only at runtime.
6//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows.
7//!
8//! This module provides a timestamp type `Pointstamp` that can represent an update with an
9//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times
10//! in iterative dataflows are ordered. The module also provides methods for manipulating these
11//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes.
12//!
13
14pub mod pointstamp;
15
16use timely::order::Product;
17use timely::progress::Timestamp;
18use timely::dataflow::operators::generic::{OutputBuilder, builder_rc::OperatorBuilder};
19use timely::dataflow::channels::pact::Pipeline;
20use timely::progress::Antichain;
21
22use crate::difference::Semigroup;
23use crate::{VecCollection, Data};
24use crate::collection::AsCollection;
25use crate::dynamic::pointstamp::PointStamp;
26use crate::dynamic::pointstamp::PointStampSummary;
27
28impl<'scope, D, R, T, TOuter> VecCollection<'scope, Product<TOuter, PointStamp<T>>, D, R>
29where
30    D: Data,
31    R: Semigroup+'static,
32    T: Timestamp+Default,
33    TOuter: Timestamp,
34{
35    /// Enters a dynamically created scope which has `level` timestamp coordinates.
36    pub fn enter_dynamic(self, _level: usize) -> Self {
37        self
38    }
39    /// Leaves a dynamically created scope which has `level` timestamp coordinates.
40    pub fn leave_dynamic(self, level: usize) -> Self {
41        // Create a unary operator that will strip all but `level-1` timestamp coordinates.
42        let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
43        let (output, stream) = builder.new_output();
44        let mut output = OutputBuilder::from(output);
45        let mut input = builder.new_input_connection(self.inner, Pipeline, [(0, Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } }))]);
46
47        builder.build(move |_capability| move |_frontier| {
48            let mut output = output.activate();
49            input.for_each(|cap, data| {
50                let mut new_time = cap.time().clone();
51                let mut vec = std::mem::take(&mut new_time.inner).into_inner();
52                vec.truncate(level - 1);
53                new_time.inner = PointStamp::new(vec);
54                let new_cap = cap.delayed(&new_time, 0);
55                for (_data, time, _diff) in data.iter_mut() {
56                    let mut vec = std::mem::take(&mut time.inner).into_inner();
57                    vec.truncate(level - 1);
58                    time.inner = PointStamp::new(vec);
59                }
60                output.session(&new_cap).give_container(data);
61            });
62        });
63
64        stream.as_collection()
65    }
66}
67
68/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate.
69pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
70where
71    T: Timestamp+Default,
72{
73    PointStampSummary {
74        retain: None,
75        actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(),
76    }
77}