timely/dataflow/operators/
feedback.rs

1//! Create cycles in a timely dataflow graph.
2
3use crate::{Container, Data};
4
5use crate::progress::{Timestamp, PathSummary};
6use crate::progress::frontier::Antichain;
7use crate::order::Product;
8
9use crate::dataflow::channels::pushers::TeeCore;
10use crate::dataflow::channels::pact::Pipeline;
11use crate::dataflow::{StreamCore, Scope, Stream};
12use crate::dataflow::scopes::child::Iterative;
13use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
14use crate::dataflow::operators::generic::OutputWrapper;
15
16/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
17pub trait Feedback<G: Scope> {
18    /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
19    ///
20    /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
21    /// its `Handle` passed as an argument. Data passed through the stream will have their
22    /// timestamps advanced by `summary`.
23    ///
24    /// # Examples
25    /// ```
26    /// use timely::dataflow::Scope;
27    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
28    ///
29    /// timely::example(|scope| {
30    ///     // circulate 0..10 for 100 iterations.
31    ///     let (handle, cycle) = scope.feedback(1);
32    ///     (0..10).to_stream(scope)
33    ///            .concat(&cycle)
34    ///            .inspect(|x| println!("seen: {:?}", x))
35    ///            .branch_when(|t| t < &100).1
36    ///            .connect_loop(handle);
37    /// });
38    /// ```
39    fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);
40
41    /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`.
42    ///
43    /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
44    /// its `Handle` passed as an argument. Data passed through the stream will have their
45    /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`.
46    ///
47    /// # Examples
48    /// ```
49    /// use timely::dataflow::Scope;
50    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
51    ///
52    /// timely::example(|scope| {
53    ///     // circulate 0..10 for 100 iterations.
54    ///     let (handle, cycle) = scope.feedback_core::<Vec<_>>(1);
55    ///     (0..10).to_stream(scope)
56    ///            .concat(&cycle)
57    ///            .inspect(|x| println!("seen: {:?}", x))
58    ///            .branch_when(|t| t < &100).1
59    ///            .connect_loop(handle);
60    /// });
61    /// ```
62    fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, StreamCore<G, D>);
63}
64
65/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
66pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
67    /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
68    ///
69    /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
70    /// its `Handle` passed as an argument. Data passed through the stream will have their
71    /// timestamps advanced by `summary`.
72    ///
73    /// # Examples
74    /// ```
75    /// use timely::dataflow::Scope;
76    /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
77    ///
78    /// timely::example(|scope| {
79    ///     // circulate 0..10 for 100 iterations.
80    ///     scope.iterative::<usize,_,_>(|inner| {
81    ///         let (handle, cycle) = inner.loop_variable(1);
82    ///         (0..10).to_stream(inner)
83    ///                .concat(&cycle)
84    ///                .inspect(|x| println!("seen: {:?}", x))
85    ///                .branch_when(|t| t.inner < 100).1
86    ///                .connect_loop(handle);
87    ///     });
88    /// });
89    /// ```
90    fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, StreamCore<Iterative<'a, G, T>, D>);
91}
92
93impl<G: Scope> Feedback<G> for G {
94    fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
95        self.feedback_core(summary)
96    }
97
98    fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, StreamCore<G, D>) {
99
100        let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
101        let (output, stream) = builder.new_output();
102
103        (HandleCore { builder, summary, output }, stream)
104    }
105}
106
107impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
108    fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, StreamCore<Iterative<'a, G, T>, D>) {
109        self.feedback_core(Product::new(Default::default(), summary))
110    }
111}
112
113/// Connect a `Stream` to the input of a loop variable.
114pub trait ConnectLoop<G: Scope, D: Container> {
115    /// Connect a `Stream` to be the input of a loop variable.
116    ///
117    /// # Examples
118    /// ```
119    /// use timely::dataflow::Scope;
120    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
121    ///
122    /// timely::example(|scope| {
123    ///     // circulate 0..10 for 100 iterations.
124    ///     let (handle, cycle) = scope.feedback(1);
125    ///     (0..10).to_stream(scope)
126    ///            .concat(&cycle)
127    ///            .inspect(|x| println!("seen: {:?}", x))
128    ///            .branch_when(|t| t < &100).1
129    ///            .connect_loop(handle);
130    /// });
131    /// ```
132    fn connect_loop(&self, _: HandleCore<G, D>);
133}
134
135impl<G: Scope, D: Container> ConnectLoop<G, D> for StreamCore<G, D> {
136    fn connect_loop(&self, helper: HandleCore<G, D>) {
137
138        let mut builder = helper.builder;
139        let summary = helper.summary;
140        let mut output = helper.output;
141
142        let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
143
144        let mut vector = Default::default();
145        builder.build(move |_capability| move |_frontier| {
146            let mut output = output.activate();
147            input.for_each(|cap, data| {
148                data.swap(&mut vector);
149                if let Some(new_time) = summary.results_in(cap.time()) {
150                    let new_cap = cap.delayed(&new_time);
151                    output
152                        .session(&new_cap)
153                        .give_container(&mut vector);
154                }
155            });
156        });
157    }
158}
159
160/// A handle used to bind the source of a loop variable.
161#[derive(Debug)]
162pub struct HandleCore<G: Scope, D: Container> {
163    builder: OperatorBuilder<G>,
164    summary: <G::Timestamp as Timestamp>::Summary,
165    output: OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>,
166}
167
168/// A `HandleCore` specialized for using `Vec` as container
169pub type Handle<G, D> = HandleCore<G, Vec<D>>;