differential_dataflow/operators/
iterate.rs

1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34use std::ops::Deref;
35
36use timely::progress::{Timestamp, PathSummary};
37use timely::order::Product;
38
39use timely::dataflow::*;
40use timely::dataflow::scopes::child::Iterative;
41use timely::dataflow::operators::{Feedback, ConnectLoop, Map};
42use timely::dataflow::operators::feedback::Handle;
43
44use crate::{Data, Collection};
45use crate::difference::{Semigroup, Abelian};
46use crate::lattice::Lattice;
47
48/// An extension trait for the `iterate` method.
49pub trait Iterate<G: Scope, D: Data, R: Semigroup> {
50    /// Iteratively apply `logic` to the source collection until convergence.
51    ///
52    /// Importantly, this method does not automatically consolidate results.
53    /// It may be important to conclude with `consolidate()` to ensure that
54    /// logically empty collections that contain cancelling records do not
55    /// result in non-termination. Operators like `reduce`, `distinct`, and
56    /// `count` also perform consolidation, and are safe to conclude with.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// use differential_dataflow::input::Input;
62    /// use differential_dataflow::operators::Iterate;
63    ///
64    /// ::timely::example(|scope| {
65    ///
66    ///     scope.new_collection_from(1 .. 10u32).1
67    ///          .iterate(|values| {
68    ///              values.map(|x| if x % 2 == 0 { x/2 } else { x })
69    ///                    .consolidate()
70    ///          });
71    /// });
72    /// ```
73    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
74        where
75            G::Timestamp: Lattice,
76            for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>;
77}
78
79impl<G: Scope, D: Ord+Data+Debug, R: Abelian> Iterate<G, D, R> for Collection<G, D, R> {
80    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
81        where G::Timestamp: Lattice,
82              for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
83
84        self.inner.scope().scoped("Iterate", |subgraph| {
85            // create a new variable, apply logic, bind variable, return.
86            //
87            // this could be much more succinct if we returned the collection
88            // wrapped by `variable`, but it also results in substantially more
89            // diffs produced; `result` is post-consolidation, and means fewer
90            // records are yielded out of the loop.
91            let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
92            let result = logic(&variable);
93            variable.set(&result);
94            result.leave()
95        })
96    }
97}
98
99impl<G: Scope, D: Ord+Data+Debug, R: Semigroup> Iterate<G, D, R> for G {
100    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
101        where G::Timestamp: Lattice,
102              for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
103
104        // TODO: This makes me think we have the wrong ownership pattern here.
105        let mut clone = self.clone();
106        clone
107            .scoped("Iterate", |subgraph| {
108                // create a new variable, apply logic, bind variable, return.
109                //
110                // this could be much more succinct if we returned the collection
111                // wrapped by `variable`, but it also results in substantially more
112                // diffs produced; `result` is post-consolidation, and means fewer
113                // records are yielded out of the loop.
114                let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
115                let result = logic(&variable);
116                variable.set(&result);
117                result.leave()
118            }
119        )
120    }
121}
122
123/// A recursively defined collection.
124///
125/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
126/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
127/// collections evolve simultaneously.
128///
129/// # Examples
130///
131/// The following example is equivalent to the example for the `Iterate` trait.
132///
133/// ```
134/// use timely::order::Product;
135/// use timely::dataflow::Scope;
136///
137/// use differential_dataflow::input::Input;
138/// use differential_dataflow::operators::iterate::Variable;
139///
140/// ::timely::example(|scope| {
141///
142///     let numbers = scope.new_collection_from(1 .. 10u32).1;
143///
144///     scope.iterative::<u64,_,_>(|nested| {
145///         let summary = Product::new(Default::default(), 1);
146///         let variable = Variable::new_from(numbers.enter(nested), summary);
147///         let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
148///                              .consolidate();
149///         variable.set(&result)
150///                 .leave()
151///     });
152/// })
153/// ```
154pub struct Variable<G: Scope, D: Data, R: Abelian>
155where G::Timestamp: Lattice {
156    collection: Collection<G, D, R>,
157    feedback: Handle<G, (D, G::Timestamp, R)>,
158    source: Option<Collection<G, D, R>>,
159    step: <G::Timestamp as Timestamp>::Summary,
160}
161
162impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattice {
163    /// Creates a new initially empty `Variable`.
164    ///
165    /// This method produces a simpler dataflow graph than `new_from`, and should
166    /// be used whenever the variable has an empty input.
167    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
168        let (feedback, updates) = scope.feedback(step.clone());
169        let collection = Collection::new(updates);
170        Variable { collection, feedback, source: None, step }
171    }
172
173    /// Creates a new `Variable` from a supplied `source` stream.
174    pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
175        let (feedback, updates) = source.inner.scope().feedback(step.clone());
176        let collection = Collection::new(updates).concat(&source);
177        Variable { collection, feedback, source: Some(source), step }
178    }
179
180    /// Set the definition of the `Variable` to a collection.
181    ///
182    /// This method binds the `Variable` to be equal to the supplied collection,
183    /// which may be recursively defined in terms of the variable itself.
184    pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
185        let mut in_result = result.clone();
186        if let Some(source) = &self.source {
187            in_result = in_result.concat(&source.negate());
188        }
189        self.set_concat(&in_result)
190    }
191
192    /// Set the definition of the `Variable` to a collection concatenated to `self`.
193    ///
194    /// This method is a specialization of `set` which has the effect of concatenating
195    /// `result` and `self` before calling `set`. This method avoids some dataflow
196    /// complexity related to retracting the initial input, and will do less work in
197    /// that case.
198    ///
199    /// This behavior can also be achieved by using `new` to create an empty initial
200    /// collection, and then using `self.set(self.concat(result))`.
201    pub fn set_concat(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
202        let step = self.step;
203        result
204            .inner
205            .flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
206            .connect_loop(self.feedback);
207
208        self.collection
209    }
210}
211
212impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timestamp: Lattice {
213    type Target = Collection<G, D, R>;
214    fn deref(&self) -> &Self::Target {
215        &self.collection
216    }
217}
218
219/// A recursively defined collection that only "grows".
220///
221/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
222/// that do not implement `Abelian` and only implement `Semigroup`. This means
223/// that it can be used in settings where the difference type does not support
224/// negation.
225pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup>
226where G::Timestamp: Lattice {
227    collection: Collection<G, D, R>,
228    feedback: Handle<G, (D, G::Timestamp, R)>,
229    step: <G::Timestamp as Timestamp>::Summary,
230}
231
232impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
233    /// Creates a new initially empty `SemigroupVariable`.
234    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
235        let (feedback, updates) = scope.feedback(step.clone());
236        let collection = Collection::new(updates);
237        SemigroupVariable { collection, feedback, step }
238    }
239
240    /// Adds a new source of data to `self`.
241    pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
242        let step = self.step;
243        result
244            .inner
245            .flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
246            .connect_loop(self.feedback);
247
248        self.collection
249    }
250}
251
252impl<G: Scope, D: Data, R: Semigroup> Deref for SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
253    type Target = Collection<G, D, R>;
254    fn deref(&self) -> &Self::Target {
255        &self.collection
256    }
257}