differential_dataflow/operators/iterate.rs
1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34
35use timely::Container;
36use timely::progress::Timestamp;
37use timely::order::Product;
38
39use timely::dataflow::*;
40use timely::dataflow::scopes::child::Iterative;
41use timely::dataflow::operators::{Feedback, ConnectLoop};
42use timely::dataflow::operators::feedback::Handle;
43
44use crate::{Data, VecCollection, Collection};
45use crate::difference::{Semigroup, Abelian};
46use crate::lattice::Lattice;
47
48/// An extension trait for the `iterate` method.
49pub trait Iterate<G: Scope<Timestamp: Lattice>, D: Data, R: Semigroup> {
50 /// Iteratively apply `logic` to the source collection until convergence.
51 ///
52 /// Importantly, this method does not automatically consolidate results.
53 /// It may be important to conclude the closure you supply with `consolidate()` to ensure that
54 /// logically empty collections that contain cancelling records do not result in non-termination.
55 /// Operators like `reduce`, `distinct`, and `count` also perform consolidation, and are safe to conclude with.
56 ///
57 /// The closure is also passed a copy of the inner scope, to facilitate importing external collections.
58 /// It can also be acquired by calling `.scope()` on the closure's collection argument, but the code
59 /// can be awkward to write fluently.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use differential_dataflow::input::Input;
65 /// use differential_dataflow::operators::Iterate;
66 ///
67 /// ::timely::example(|scope| {
68 ///
69 /// scope.new_collection_from(1 .. 10u32).1
70 /// .iterate(|_scope, values| {
71 /// values.map(|x| if x % 2 == 0 { x/2 } else { x })
72 /// .consolidate()
73 /// });
74 /// });
75 /// ```
76 fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>
77 where
78 for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>;
79}
80
81impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Abelian+'static> Iterate<G, D, R> for VecCollection<G, D, R> {
82 fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>
83 where
84 for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>,
85 {
86 self.inner.scope().scoped("Iterate", |subgraph| {
87 // create a new variable, apply logic, bind variable, return.
88 //
89 // this could be much more succinct if we returned the collection
90 // wrapped by `variable`, but it also results in substantially more
91 // diffs produced; `result` is post-consolidation, and means fewer
92 // records are yielded out of the loop.
93 let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
94 let result = logic(subgraph.clone(), collection);
95 variable.set(result.clone());
96 result.leave()
97 })
98 }
99}
100
101impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
102 fn iterate<F>(mut self, logic: F) -> VecCollection<G, D, R>
103 where
104 for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>,
105 {
106 self.scoped("Iterate", |subgraph| {
107 // create a new variable, apply logic, bind variable, return.
108 //
109 // this could be much more succinct if we returned the collection
110 // wrapped by `variable`, but it also results in substantially more
111 // diffs produced; `result` is post-consolidation, and means fewer
112 // records are yielded out of the loop.
113 let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
114 let result = logic(subgraph.clone(), collection);
115 variable.set(result.clone());
116 result.leave()
117 }
118 )
119 }
120}
121
122/// A recursively defined collection.
123///
124/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
125/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
126/// collections evolve simultaneously.
127///
128/// # Examples
129///
130/// The following example is equivalent to the example for the `Iterate` trait.
131///
132/// ```
133/// use timely::order::Product;
134/// use timely::dataflow::Scope;
135///
136/// use differential_dataflow::input::Input;
137/// use differential_dataflow::operators::iterate::Variable;
138///
139/// ::timely::example(|scope| {
140///
141/// let numbers = scope.new_collection_from(1 .. 10u32).1;
142///
143/// scope.iterative::<u64,_,_>(|nested| {
144/// let summary = Product::new(Default::default(), 1);
145/// let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
146/// let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x })
147/// .consolidate();
148/// variable.set(result.clone());
149/// result.leave()
150/// });
151/// })
152/// ```
153///
154/// Variables support iterative patterns that can be both more flexible, and more efficient.
155///
156/// Mutual recursion is when one defines multiple variables in the same iterative context,
157/// and their definitions are not independent. For example, odd numbers and even numbers
158/// can be determined from each other, iteratively.
159/// ```
160/// use timely::order::Product;
161/// use timely::dataflow::Scope;
162///
163/// use differential_dataflow::input::Input;
164/// use differential_dataflow::operators::iterate::Variable;
165///
166/// ::timely::example(|scope| {
167///
168/// let numbers = scope.new_collection_from(10 .. 20u32).1;
169///
170/// scope.iterative::<u64,_,_>(|nested| {
171/// let summary = Product::new(Default::default(), 1);
172/// let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary);
173/// let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary);
174/// odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct());
175/// even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct());
176/// });
177/// })
178/// ```
179///
180/// Direct construction can be more efficient than `iterate` when you know a way to directly
181/// determine the changes to make to the initial collection, rather than simply adding that
182/// collection, running your intended logic, and then subtracting the collection.
183///
184/// An an example, the logic in `identifiers.rs` looks for hash collisions, and tweaks the salt
185/// for all but one element in each group of collisions. Most elements do not collide, and we
186/// we don't need to circulate the non-colliding elements to confirm that they subtract away.
187/// By iteratively developing a variable of the *edits* to the input, we can produce and circulate
188/// a smaller volume of updates. This can be especially impactful when the initial collection is
189/// large, and the edits to perform are relatively smaller.
190pub struct Variable<G, C>
191where
192 G: Scope<Timestamp: Lattice>,
193 C: Container,
194{
195 feedback: Handle<G, C>,
196 source: Option<Collection<G, C>>,
197 step: <G::Timestamp as Timestamp>::Summary,
198}
199
200/// A `Variable` specialized to a vector container of update triples (data, time, diff).
201pub type VecVariable<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
202
203impl<G, C: Container> Variable<G, C>
204where
205 G: Scope<Timestamp: Lattice>,
206 C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
207{
208 /// Creates a new initially empty `Variable` and its associated `Collection`.
209 ///
210 /// The collection should be used, along with other potentially recursive collections,
211 /// to define a output collection to which the variable is then `set`.
212 /// In an iterative context, each collection starts empty and are repeatedly updated by
213 /// the logic used to produce the collection their variable is bound to. This process
214 /// continues until no changes occur, at which point we have reached a fixed point (or
215 /// the range of timestamps have been exhausted). Calling `leave()` on any collection
216 /// will produce its fixed point in the outer scope.
217 ///
218 /// In a non-iterative scope the mechanics are the same, but the interpretation varies.
219 pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) {
220 let (feedback, updates) = scope.feedback(step.clone());
221 let collection = Collection::<G, C>::new(updates);
222 (Self { feedback, source: None, step }, collection)
223 }
224
225 /// Creates a new `Variable` and its associated `Collection`, initially `source`.
226 ///
227 /// This method is a short-cut for a pattern that one can write manually with `new()`,
228 /// but which is easy enough to get wrong that the help is valuable.
229 ///
230 /// This pattern uses a variable `x` to develop `x = logic(x + source) - source`,
231 /// which finds a fixed point `x` that satisfies `x + source = logic(x + source)`.
232 /// The fixed point equals the repeated application of `logic` to `source` plus the
233 ///
234 /// To implement the pattern one would create a new initially empty variable with `new()`,
235 /// then concatenate `source` into that collection, and use it as `logic` dictates.
236 /// Just before the variable is set to the result collection, `source` is subtracted.
237 ///
238 /// If using this pattern manually, it is important to bear in mind that the collection
239 /// that result from `logic` converges to its fixed point, but that once `source` is
240 /// subtracted the collection converges to this limit minus `source`, a collection that
241 /// may have records that accumulate to negative multiplicities, and for which the model
242 /// of them as "data sets" may break down. Be careful when applying non-linear operations
243 /// like `reduce` that they make sense when updates may have non-positive differences.
244 ///
245 /// Finally, implementing this pattern manually has the ability to more directly implement
246 /// the logic `x = logic(x + source) - source`. If there is a different mechanism than just
247 /// adding the source, doing the logic, then subtracting the source, it is appropriate to do.
248 /// For example, if the logic modifies a few records it is possible to produce this update
249 /// directly without using the backstop implementation this method provides.
250 pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) where C: Clone + crate::collection::containers::Negate {
251 let (feedback, updates) = source.inner.scope().feedback(step.clone());
252 let collection = Collection::<G, C>::new(updates).concat(source.clone());
253 (Variable { feedback, source: Some(source.negate()), step }, collection)
254 }
255
256 /// Set the definition of the `Variable` to a collection.
257 ///
258 /// This method binds the `Variable` to be equal to the supplied collection,
259 /// which may be recursively defined in terms of the variable itself.
260 pub fn set(mut self, mut result: Collection<G, C>) {
261 if let Some(source) = self.source.take() {
262 result = result.concat(source);
263 }
264 result
265 .results_in(self.step)
266 .inner
267 .connect_loop(self.feedback);
268 }
269}