timely/dataflow/operators/feedback.rs
1//! Create cycles in a timely dataflow graph.
2
3use crate::{Container, Data};
4
5use crate::progress::{Timestamp, PathSummary};
6use crate::progress::frontier::Antichain;
7use crate::order::Product;
8
9use crate::dataflow::channels::pushers::TeeCore;
10use crate::dataflow::channels::pact::Pipeline;
11use crate::dataflow::{StreamCore, Scope, Stream};
12use crate::dataflow::scopes::child::Iterative;
13use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
14use crate::dataflow::operators::generic::OutputWrapper;
15
16/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
17pub trait Feedback<G: Scope> {
18 /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
19 ///
20 /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
21 /// its `Handle` passed as an argument. Data passed through the stream will have their
22 /// timestamps advanced by `summary`.
23 ///
24 /// # Examples
25 /// ```
26 /// use timely::dataflow::Scope;
27 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
28 ///
29 /// timely::example(|scope| {
30 /// // circulate 0..10 for 100 iterations.
31 /// let (handle, cycle) = scope.feedback(1);
32 /// (0..10).to_stream(scope)
33 /// .concat(&cycle)
34 /// .inspect(|x| println!("seen: {:?}", x))
35 /// .branch_when(|t| t < &100).1
36 /// .connect_loop(handle);
37 /// });
38 /// ```
39 fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);
40
41 /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`.
42 ///
43 /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
44 /// its `Handle` passed as an argument. Data passed through the stream will have their
45 /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`.
46 ///
47 /// # Examples
48 /// ```
49 /// use timely::dataflow::Scope;
50 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
51 ///
52 /// timely::example(|scope| {
53 /// // circulate 0..10 for 100 iterations.
54 /// let (handle, cycle) = scope.feedback_core::<Vec<_>>(1);
55 /// (0..10).to_stream(scope)
56 /// .concat(&cycle)
57 /// .inspect(|x| println!("seen: {:?}", x))
58 /// .branch_when(|t| t < &100).1
59 /// .connect_loop(handle);
60 /// });
61 /// ```
62 fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, StreamCore<G, D>);
63}
64
65/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
66pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
67 /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
68 ///
69 /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
70 /// its `Handle` passed as an argument. Data passed through the stream will have their
71 /// timestamps advanced by `summary`.
72 ///
73 /// # Examples
74 /// ```
75 /// use timely::dataflow::Scope;
76 /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
77 ///
78 /// timely::example(|scope| {
79 /// // circulate 0..10 for 100 iterations.
80 /// scope.iterative::<usize,_,_>(|inner| {
81 /// let (handle, cycle) = inner.loop_variable(1);
82 /// (0..10).to_stream(inner)
83 /// .concat(&cycle)
84 /// .inspect(|x| println!("seen: {:?}", x))
85 /// .branch_when(|t| t.inner < 100).1
86 /// .connect_loop(handle);
87 /// });
88 /// });
89 /// ```
90 fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, StreamCore<Iterative<'a, G, T>, D>);
91}
92
93impl<G: Scope> Feedback<G> for G {
94 fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
95 self.feedback_core(summary)
96 }
97
98 fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, StreamCore<G, D>) {
99
100 let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
101 let (output, stream) = builder.new_output();
102
103 (HandleCore { builder, summary, output }, stream)
104 }
105}
106
107impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
108 fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, StreamCore<Iterative<'a, G, T>, D>) {
109 self.feedback_core(Product::new(Default::default(), summary))
110 }
111}
112
113/// Connect a `Stream` to the input of a loop variable.
114pub trait ConnectLoop<G: Scope, D: Container> {
115 /// Connect a `Stream` to be the input of a loop variable.
116 ///
117 /// # Examples
118 /// ```
119 /// use timely::dataflow::Scope;
120 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
121 ///
122 /// timely::example(|scope| {
123 /// // circulate 0..10 for 100 iterations.
124 /// let (handle, cycle) = scope.feedback(1);
125 /// (0..10).to_stream(scope)
126 /// .concat(&cycle)
127 /// .inspect(|x| println!("seen: {:?}", x))
128 /// .branch_when(|t| t < &100).1
129 /// .connect_loop(handle);
130 /// });
131 /// ```
132 fn connect_loop(&self, _: HandleCore<G, D>);
133}
134
135impl<G: Scope, D: Container> ConnectLoop<G, D> for StreamCore<G, D> {
136 fn connect_loop(&self, helper: HandleCore<G, D>) {
137
138 let mut builder = helper.builder;
139 let summary = helper.summary;
140 let mut output = helper.output;
141
142 let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
143
144 let mut vector = Default::default();
145 builder.build(move |_capability| move |_frontier| {
146 let mut output = output.activate();
147 input.for_each(|cap, data| {
148 data.swap(&mut vector);
149 if let Some(new_time) = summary.results_in(cap.time()) {
150 let new_cap = cap.delayed(&new_time);
151 output
152 .session(&new_cap)
153 .give_container(&mut vector);
154 }
155 });
156 });
157 }
158}
159
160/// A handle used to bind the source of a loop variable.
161#[derive(Debug)]
162pub struct HandleCore<G: Scope, D: Container> {
163 builder: OperatorBuilder<G>,
164 summary: <G::Timestamp as Timestamp>::Summary,
165 output: OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>,
166}
167
168/// A `HandleCore` specialized for using `Vec` as container
169pub type Handle<G, D> = HandleCore<G, Vec<D>>;