1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
//! Iterative application of a differential dataflow fragment. //! //! The `iterate` operator takes as an argument a closure from a differential dataflow stream to a //! stream of the same type. The output are differences which accumulate to the result of applying //! this closure a specified number of times. //! //! The implementation of `iterate` does not directly apply the closure, but rather establishes an //! iterative timely dataflow subcomputation, in which differences circulate until they dissipate //! (indicating that the computation has reached fixed point), or until some number of iterations //! have passed. //! //! The `iterate` method is written using a `Variable`, which lets you define your own iterative //! computations when `iterate` itself is not sufficient. This can happen when you have two //! collections that should evolve simultaneously, or when you would like to return an intermediate //! result from your iterative computation. //! //! Using `Variable` requires more explicit arrangement of your computation, but isn't much more //! complicated. You must define a new variable from an existing stream (its initial value), and //! then set it to be a function of this variable (and perhaps other collections and variables). //! //! A `Variable` derefences to a `Collection`, the one corresponding to its value in each iteration, //! and it can be used in most situations where a collection can be used. The act of setting a //! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting //! it multiple times. //! //! #Examples //! //! The example repeatedly divides even numbers by two, and leaves odd numbers as they are. Although //! some numbers may take multiple iterations to converge, converged numbers have no overhead in //! subsequent iterations. //! //! ```ignore //! // repeatedly divide out factors of two. //! let limits = numbers.iterate(|values| { //! values.map(|x if x % 2 == 0 { x/2 } else { x }) //! }); //! ``` //! //! The same example written manually with a `Variable`: //! //! ```ignore //! // repeatedly divide out factors of two. //! let limits = computation.scoped(|scope| { //! let variable = Variable::from(numbers.enter(scope)); //! let result = variable.map(|x if x % 2 == 0 { x/2 } else { x }); //! variable.set(&result) //! .leave() //! }) use std::fmt::Debug; use std::ops::Deref; use timely::dataflow::*; use timely::dataflow::scopes::Child; use timely::dataflow::operators::*; use timely::dataflow::operators::feedback::Handle; use ::{Data, Collection}; use collection::LeastUpperBound; /// An extension trait for the `iterate` method. pub trait IterateExt<G: Scope, D: Data> { /// Iteratively apply `logic` to the source collection until convergence. fn iterate<F>(&self, logic: F) -> Collection<G, D> where G::Timestamp: LeastUpperBound, for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D>)->Collection<Child<'a, G, u64>, D>; } impl<G: Scope, D: Ord+Data+Debug> IterateExt<G, D> for Collection<G, D> { fn iterate<F>(&self, logic: F) -> Collection<G, D> where G::Timestamp: LeastUpperBound, for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D>)->Collection<Child<'a, G, u64>, D> { self.inner.scope().scoped(|subgraph| { // create a new variable, apply logic, bind variable, return. // // this could be much more succinct if we returned the collection // wrapped by `variable`, but it also results in substantially more // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. let variable = Variable::from(self.enter(subgraph)); let result = logic(&variable); variable.set(&result); result.leave() }) } } /// A differential dataflow collection variable /// /// The `Variable` struct allows differential dataflow programs requiring more sophisticated /// iterative patterns than singly recursive iteration. For example: in mutual recursion two /// collections evolve simultaneously. pub struct Variable<'a, G: Scope, D: Data> where G::Timestamp: LeastUpperBound { collection: Collection<Child<'a, G, u64>, D>, feedback: Handle<G::Timestamp, u64,(D, i32)>, source: Collection<Child<'a, G, u64>, D>, } impl<'a, G: Scope, D: Data> Variable<'a, G, D> where G::Timestamp: LeastUpperBound { /// Creates a new `Variable` and a `Stream` representing its output, from a supplied `source` stream. pub fn from(source: Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> { let (feedback, updates) = source.inner.scope().loop_variable(u64::max_value(), 1); let collection = Collection::new(updates).concat(&source); Variable { collection: collection, feedback: feedback, source: source } } /// Adds a new source of data to the `Variable`. pub fn set(self, result: &Collection<Child<'a, G, u64>, D>) -> Collection<Child<'a, G, u64>, D> { self.source.negate() .concat(result) .inner .connect_loop(self.feedback); self.collection } } impl<'a, G: Scope, D: Data> Deref for Variable<'a, G, D> where G::Timestamp: LeastUpperBound { type Target = Collection<Child<'a, G, u64>, D>; fn deref(&self) -> &Self::Target { &self.collection } }