1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Iterative application of a differential dataflow fragment.
//!
//! The `iterate` operator takes as an argument a closure from a differential dataflow collection 
//! to a collection of the same type. The output collection is the result of applying this closure 
//! an unbounded number of times.
//!
//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
//! (indicating that the computation has reached fixed point), or until some number of iterations
//! have passed. 
//!
//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for 
//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
//! all paths from the input to the output of the loop involve consolidation, or (iii) you should 
//! be worried that logically cancelable differences may circulate indefinitely.
//!
//! #Details 
//!
//! The `iterate` method is written using a `Variable`, which lets you define your own iterative 
//! computations when `iterate` itself is not sufficient. This can happen when you have two 
//! collections that should evolve simultaneously, or when you would like to rotate your loop and 
//! return an intermediate result.
//! 
//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
//! complicated. You must define a new variable from an existing stream (its initial value), and 
//! then set it to be a function of this variable (and perhaps other collections and variables).
//!
//! A `Variable` derefences to a `Collection`, the one corresponding to its value in each iteration,
//! and it can be used in most situations where a collection can be used. The act of setting a 
//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
//! it multiple times.

use std::fmt::Debug;
use std::ops::Deref;

use timely::progress::nested::product::Product;

use timely::dataflow::*;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::*;
use timely::dataflow::operators::feedback::Handle;

use ::{Data, Collection, Diff};
use lattice::Lattice;

/// An extension trait for the `iterate` method.
pub trait Iterate<G: Scope, D: Data, R: Diff> {
    /// Iteratively apply `logic` to the source collection until convergence.
    ///
    /// # Examples
    ///
    /// ```
    /// extern crate timely;
    /// extern crate differential_dataflow;
    ///
    /// use differential_dataflow::input::Input;
    /// use differential_dataflow::operators::Iterate;
    /// use differential_dataflow::operators::Consolidate;
    ///
    /// fn main() {
    ///     ::timely::example(|scope| {
    ///
    ///         scope.new_collection_from(1 .. 10u32).1
    ///              .iterate(|values| {
    ///                  values.map(|x| if x % 2 == 0 { x/2 } else { x })
    ///                        .consolidate()
    ///              });
    ///     });
    /// }
    /// ```    
    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
        where G::Timestamp: Lattice,
              for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D, R>)->Collection<Child<'a, G, u64>, D, R>;
}

impl<G: Scope, D: Ord+Data+Debug, R: Diff> Iterate<G, D, R> for Collection<G, D, R> {
    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
        where G::Timestamp: Lattice,
              for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D, R>)->Collection<Child<'a, G, u64>, D, R> {

        self.inner.scope().scoped(|subgraph| {
            // create a new variable, apply logic, bind variable, return.
            //
            // this could be much more succinct if we returned the collection
            // wrapped by `variable`, but it also results in substantially more
            // diffs produced; `result` is post-consolidation, and means fewer
            // records are yielded out of the loop.
            let variable = Variable::from(self.enter(subgraph));
            let result = logic(&variable);
            variable.set(&result);
            result.leave()
        })
    }
}

/// A differential dataflow collection variable
///
/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
/// iterative patterns than singly recursive iteration. For example: in mutual recursion two 
/// collections evolve simultaneously.
///
/// # Examples
///
/// The following example is equivalent to the example for the `Iterate` trait.
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::dataflow::Scope;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::iterate::Variable;
/// use differential_dataflow::operators::Consolidate;
///
/// fn main() {
///     ::timely::example(|scope| {
///
///         let numbers = scope.new_collection_from(1 .. 10u32).1;
///
///         scope.scoped(|nested| {
///             let variable = Variable::from(numbers.enter(nested));
///             let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
///                                  .consolidate();
///             variable.set(&result)
///                     .leave()
///         });
///     })
/// }
/// ```    
pub struct Variable<'a, G: Scope, D: Data, R: Diff>
where G::Timestamp: Lattice {
    collection: Collection<Child<'a, G, u64>, D, R>,
    feedback: Handle<G::Timestamp, u64,(D, Product<G::Timestamp, u64>, R)>,
    source: Collection<Child<'a, G, u64>, D, R>,
}

impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R> where G::Timestamp: Lattice {
    /// Creates a new `Variable` and a `Stream` representing its output, from a supplied `source` stream.
    pub fn from(source: Collection<Child<'a, G, u64>, D, R>) -> Variable<'a, G, D, R> {
        let (feedback, updates) = source.inner.scope().loop_variable(u64::max_value(), 1);
        let collection = Collection::new(updates).concat(&source);
        Variable { collection: collection, feedback: feedback, source: source }
    }
    /// Adds a new source of data to the `Variable`.
    pub fn set(self, result: &Collection<Child<'a, G, u64>, D, R>) -> Collection<Child<'a, G, u64>, D, R> {
        self.source.negate()
                   .concat(result)
                   .inner
                   .map(|(x,t,d)| (x, Product::new(t.outer, t.inner+1), d))
                   .connect_loop(self.feedback);

        self.collection
    }
}

impl<'a, G: Scope, D: Data, R: Diff> Deref for Variable<'a, G, D, R> where G::Timestamp: Lattice {
    type Target = Collection<Child<'a, G, u64>, D, R>;
    fn deref(&self) -> &Self::Target {
        &self.collection
    }
}