Skip to main content

palimpsest_dataflow/dynamic/
mod.rs

1//! Types and operators for dynamically scoped iterative dataflows.
2//!
3//! Scopes in timely dataflow are expressed statically, as part of the type system.
4//! This affords many efficiencies, as well as type-driven reassurance of correctness.
5//! However, there are times you need scopes whose organization is discovered only at runtime.
6//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows.
7//!
8//! This module provides a timestamp type `Pointstamp` that can represent an update with an
9//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times
10//! in iterative dataflows are ordered. The module also provides methods for manipulating these
11//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes.
12//!
13
14pub mod pointstamp;
15
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
18use timely::dataflow::Scope;
19use timely::order::Product;
20use timely::progress::Antichain;
21use timely::progress::Timestamp;
22
23use crate::collection::AsCollection;
24use crate::difference::Semigroup;
25use crate::dynamic::pointstamp::PointStamp;
26use crate::dynamic::pointstamp::PointStampSummary;
27use crate::{Data, VecCollection};
28
29impl<G, D, R, T, TOuter> VecCollection<G, D, R>
30where
31    G: Scope<Timestamp = Product<TOuter, PointStamp<T>>>,
32    D: Data,
33    R: Semigroup + 'static,
34    T: Timestamp + Default,
35    TOuter: Timestamp,
36{
37    /// Enters a dynamically created scope which has `level` timestamp coordinates.
38    pub fn enter_dynamic(&self, _level: usize) -> Self {
39        (*self).clone()
40    }
41    /// Leaves a dynamically created scope which has `level` timestamp coordinates.
42    pub fn leave_dynamic(&self, level: usize) -> Self {
43        // Create a unary operator that will strip all but `level-1` timestamp coordinates.
44        let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
45        let (output, stream) = builder.new_output();
46        let mut output = OutputBuilder::from(output);
47        let mut input = builder.new_input_connection(
48            &self.inner,
49            Pipeline,
50            [(
51                0,
52                Antichain::from_elem(Product {
53                    outer: Default::default(),
54                    inner: PointStampSummary {
55                        retain: Some(level - 1),
56                        actions: Vec::new(),
57                    },
58                }),
59            )],
60        );
61
62        builder.build(move |_capability| {
63            move |_frontier| {
64                let mut output = output.activate();
65                input.for_each(|cap, data| {
66                    let mut new_time = cap.time().clone();
67                    let mut vec = std::mem::take(&mut new_time.inner).into_vec();
68                    vec.truncate(level - 1);
69                    new_time.inner = PointStamp::new(vec);
70                    let new_cap = cap.delayed(&new_time);
71                    for (_data, time, _diff) in data.iter_mut() {
72                        let mut vec = std::mem::take(&mut time.inner).into_vec();
73                        vec.truncate(level - 1);
74                        time.inner = PointStamp::new(vec);
75                    }
76                    output.session(&new_cap).give_container(data);
77                });
78            }
79        });
80
81        stream.as_collection()
82    }
83}
84
85/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate.
86pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
87where
88    T: Timestamp + Default,
89{
90    PointStampSummary {
91        retain: None,
92        actions: std::iter::repeat(Default::default())
93            .take(level - 1)
94            .chain(std::iter::once(summary))
95            .collect(),
96    }
97}