differential_dataflow/operators/iterate.rs
1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34
35use timely::Container;
36use timely::progress::Timestamp;
37use timely::order::Product;
38
39use timely::dataflow::*;
40use timely::dataflow::scope::Iterative;
41use timely::dataflow::operators::{Feedback, ConnectLoop};
42use timely::dataflow::operators::feedback::Handle;
43
44use crate::{Data, VecCollection, Collection};
45use crate::difference::{Semigroup, Abelian};
46use crate::lattice::Lattice;
47
48/// An extension trait for the `iterate` method.
49pub trait Iterate<'scope, T: Timestamp + Lattice, D: Data, R: Semigroup> {
50 /// Iteratively apply `logic` to the source collection until convergence.
51 ///
52 /// Importantly, this method does not automatically consolidate results.
53 /// It may be important to conclude the closure you supply with `consolidate()` to ensure that
54 /// logically empty collections that contain cancelling records do not result in non-termination.
55 /// Operators like `reduce`, `distinct`, and `count` also perform consolidation, and are safe to conclude with.
56 ///
57 /// The closure is also passed a copy of the inner scope, to facilitate importing external collections.
58 /// It can also be acquired by calling `.scope()` on the closure's collection argument, but the code
59 /// can be awkward to write fluently.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use differential_dataflow::input::Input;
65 /// use differential_dataflow::operators::Iterate;
66 ///
67 /// ::timely::example(|scope| {
68 ///
69 /// scope.new_collection_from(1 .. 10u32).1
70 /// .iterate(|_scope, values| {
71 /// values.map(|x| if x % 2 == 0 { x/2 } else { x })
72 /// .consolidate()
73 /// });
74 /// });
75 /// ```
76 fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
77 where
78 for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>;
79}
80
81impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Abelian+'static> Iterate<'scope, T, D, R> for VecCollection<'scope, T, D, R> {
82 fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
83 where
84 for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
85 {
86 let outer = self.inner.scope();
87 outer.scoped("Iterate", |subgraph| {
88 // create a new variable, apply logic, bind variable, return.
89 //
90 // this could be much more succinct if we returned the collection
91 // wrapped by `variable`, but it also results in substantially more
92 // diffs produced; `result` is post-consolidation, and means fewer
93 // records are yielded out of the loop.
94 let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
95 let result = logic(subgraph, collection);
96 variable.set(result.clone());
97 result.leave(outer)
98 })
99 }
100}
101
102impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<'scope, T, D, R> for Scope<'scope, T> {
103 fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
104 where
105 for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
106 {
107 let outer = self;
108 self.scoped("Iterate", |subgraph| {
109 // create a new variable, apply logic, bind variable, return.
110 //
111 // this could be much more succinct if we returned the collection
112 // wrapped by `variable`, but it also results in substantially more
113 // diffs produced; `result` is post-consolidation, and means fewer
114 // records are yielded out of the loop.
115 let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
116 let result = logic(subgraph, collection);
117 variable.set(result.clone());
118 result.leave(outer)
119 }
120 )
121 }
122}
123
124/// A recursively defined collection.
125///
126/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
127/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
128/// collections evolve simultaneously.
129///
130/// # Examples
131///
132/// The following example is equivalent to the example for the `Iterate` trait.
133///
134/// ```
135/// use timely::order::Product;
136/// use timely::dataflow::Scope;
137///
138/// use differential_dataflow::input::Input;
139/// use differential_dataflow::operators::iterate::Variable;
140///
141/// ::timely::example(|scope| {
142///
143/// let numbers = scope.new_collection_from(1 .. 10u32).1;
144///
145/// scope.iterative::<u64,_,_>(|nested| {
146/// let summary = Product::new(Default::default(), 1);
147/// let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
148/// let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x })
149/// .consolidate();
150/// variable.set(result.clone());
151/// result.leave(scope)
152/// });
153/// })
154/// ```
155///
156/// Variables support iterative patterns that can be both more flexible, and more efficient.
157///
158/// Mutual recursion is when one defines multiple variables in the same iterative context,
159/// and their definitions are not independent. For example, odd numbers and even numbers
160/// can be determined from each other, iteratively.
161/// ```
162/// use timely::order::Product;
163/// use timely::dataflow::Scope;
164///
165/// use differential_dataflow::input::Input;
166/// use differential_dataflow::operators::iterate::Variable;
167///
168/// ::timely::example(|scope| {
169///
170/// let numbers = scope.new_collection_from(10 .. 20u32).1;
171///
172/// scope.iterative::<u64,_,_>(|nested| {
173/// let summary = Product::new(Default::default(), 1);
174/// let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary);
175/// let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary);
176/// odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct());
177/// even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct());
178/// });
179/// })
180/// ```
181///
182/// Direct construction can be more efficient than `iterate` when you know a way to directly
183/// determine the changes to make to the initial collection, rather than simply adding that
184/// collection, running your intended logic, and then subtracting the collection.
185///
186/// An an example, the logic in `identifiers.rs` looks for hash collisions, and tweaks the salt
187/// for all but one element in each group of collisions. Most elements do not collide, and we
188/// we don't need to circulate the non-colliding elements to confirm that they subtract away.
189/// By iteratively developing a variable of the *edits* to the input, we can produce and circulate
190/// a smaller volume of updates. This can be especially impactful when the initial collection is
191/// large, and the edits to perform are relatively smaller.
192pub struct Variable<'scope, T, C>
193where
194 T: Timestamp + Lattice,
195 C: Container,
196{
197 feedback: Handle<'scope, T, C>,
198 source: Option<Collection<'scope, T, C>>,
199 step: T::Summary,
200}
201
202/// A `Variable` specialized to a vector container of update triples (data, time, diff).
203pub type VecVariable<'scope, T, D, R> = Variable<'scope, T, Vec<(D, T, R)>>;
204
205impl<'scope, T, C: Container> Variable<'scope, T, C>
206where
207 T: Timestamp + Lattice,
208 C: crate::collection::containers::ResultsIn<T::Summary>,
209{
210 /// Creates a new initially empty `Variable` and its associated `Collection`.
211 ///
212 /// The collection should be used, along with other potentially recursive collections,
213 /// to define a output collection to which the variable is then `set`.
214 /// In an iterative context, each collection starts empty and are repeatedly updated by
215 /// the logic used to produce the collection their variable is bound to. This process
216 /// continues until no changes occur, at which point we have reached a fixed point (or
217 /// the range of timestamps have been exhausted). Calling `leave()` on any collection
218 /// will produce its fixed point in the outer scope.
219 ///
220 /// In a non-iterative scope the mechanics are the same, but the interpretation varies.
221 pub fn new(scope: Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) {
222 let (feedback, updates) = scope.feedback(step.clone());
223 let collection = Collection::<T, C>::new(updates);
224 (Self { feedback, source: None, step }, collection)
225 }
226
227 /// Creates a new `Variable` and its associated `Collection`, initially `source`.
228 ///
229 /// This method is a short-cut for a pattern that one can write manually with `new()`,
230 /// but which is easy enough to get wrong that the help is valuable.
231 ///
232 /// This pattern uses a variable `x` to develop `x = logic(x + source) - source`,
233 /// which finds a fixed point `x` that satisfies `x + source = logic(x + source)`.
234 /// The fixed point equals the repeated application of `logic` to `source` plus the
235 ///
236 /// To implement the pattern one would create a new initially empty variable with `new()`,
237 /// then concatenate `source` into that collection, and use it as `logic` dictates.
238 /// Just before the variable is set to the result collection, `source` is subtracted.
239 ///
240 /// If using this pattern manually, it is important to bear in mind that the collection
241 /// that result from `logic` converges to its fixed point, but that once `source` is
242 /// subtracted the collection converges to this limit minus `source`, a collection that
243 /// may have records that accumulate to negative multiplicities, and for which the model
244 /// of them as "data sets" may break down. Be careful when applying non-linear operations
245 /// like `reduce` that they make sense when updates may have non-positive differences.
246 ///
247 /// Finally, implementing this pattern manually has the ability to more directly implement
248 /// the logic `x = logic(x + source) - source`. If there is a different mechanism than just
249 /// adding the source, doing the logic, then subtracting the source, it is appropriate to do.
250 /// For example, if the logic modifies a few records it is possible to produce this update
251 /// directly without using the backstop implementation this method provides.
252 pub fn new_from(source: Collection<'scope, T, C>, step: T::Summary) -> (Self, Collection<'scope, T, C>) where C: Clone + crate::collection::containers::Negate {
253 let (feedback, updates) = source.inner.scope().feedback(step.clone());
254 let collection = Collection::<T, C>::new(updates).concat(source.clone());
255 (Variable { feedback, source: Some(source.negate()), step }, collection)
256 }
257
258 /// Set the definition of the `Variable` to a collection.
259 ///
260 /// This method binds the `Variable` to be equal to the supplied collection,
261 /// which may be recursively defined in terms of the variable itself.
262 pub fn set(mut self, mut result: Collection<'scope, T, C>) {
263 if let Some(source) = self.source.take() {
264 result = result.concat(source);
265 }
266 result
267 .results_in(self.step)
268 .inner
269 .connect_loop(self.feedback);
270 }
271}