1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
//!
//! Differential dataflow programs are written in a collection-oriented style, where multisets of
//! records are transformed and combined using primitives operations like `map`, `filter`,
//! `join`, and `group_by`. Differential dataflow also includes a higher-order operation `iterate`.
//!
//! Having defined a differential dataflow computation, you may then add or remove records from its
//! inputs, and the system will automatically update the computation's outputs with the appropriate
//! corresponding additions and removals.
//!
//! Differential dataflow is built on the timely dataflow framework for data-parallel programming
//! and so is automatically parallelizable across multiple threads, processes, and computers.
//! Moreover, because it uses timely dataflow's primitives, it seamlessly inter-operates with other
//! timely dataflow computations.
//!
//! Differential dataflow is still very much a work in progress, with features and ergonomics still
//! wildly in development. It is generally improving, though.
//!
//! # Examples
//!
//! ```ignore
//! extern crate timely;
//! use timely::*;
//! use timely::dataflow::Scope;
//! use timely::dataflow::operators::{Input, Inspect};
//!
//! use differential_dataflow::operators::*;
//!
//! // construct and execute a timely dataflow
//! timely::execute(Configuration::Thread, |root| {
//!
//!     // construct an input and group its records
//!     // keeping only the smallest values.
//!     let mut input = root.scoped(|scope| {
//!         let (handle, stream) = scope.new_input();
//!         stream.group(|key, vals, output| output.push(vals.next().unwrap()))
//!               .inspect(|val| println!("observed: {:?}", val));
//!
//!         handle
//!     });
//!
//!     // introduce many records
//!     for i in 0..1000 {
//!         input.send((i % 10, i % 3));
//!         input.advance_to(i + 1);
//!         root.step();
//!     }
//! });
//! ```
//!
//!
//!
//! For a more complicated example, the following fragment computes the breadth-first-search depth
//! in a graph.
//!
//! ```ignore
//! extern crate timely;
//! use timely::*;
//! use timely::dataflow::Scope;
//! use timely::dataflow::operators::{Input, Inspect};
//!
//! use differential_dataflow::operators::*;
//!
//! // construct and execute a timely dataflow
//! timely::execute(Configuration::Thread, |root| {
//!
//!     let (edges, roots) = root.scoped(|scope| {
//!
//!         let (e_in, edges) = scope.new_input::<((u32, u32), i32)>();
//!         let (r_in, roots) = scope.new_input::<(u32, i32)>();
//!
//!         // initialize roots at distance 0
//!         let start = roots.map(|(x, w)| ((x, 0), w));
//!
//!         // repeatedly update minimal distances to each node,
//!         // by describing how to do one round of updates, and then repeating.
//!         let limit = start.iterate(|dists| {
//!
//!             // bring the invariant edges into the loop
//!             let edges = edges.enter(&dists.scope());
//!
//!             // join current distances with edges to get +1 distances,
//!             // include the current distances in the set as well,
//!             // group by node id and keep minimum distance.
//!             dists.join_map(&edges, |_,&d,&n| (n,d+1))
//!                  .concat(&dists)
//!                  .group(|_, s, t| {
//!                      t.push((*s.peek().unwrap().0, 1))
//!                  })
//!         });
//!
//!         // inspect distances!
//!         limit.inspect(|x| println!("observed: {:?}", x));
//!
//!         (e_in, r_in)
//!     });
//!
//!     edges.send(((0,1), 1));
//!     edges.send(((1,2), 1));
//!     edges.send(((2,3), 1));
//!
//!     roots.send((0, 1));
//! });
//! ```

#![forbid(missing_docs)]

use std::hash::Hasher;
use std::fmt::Debug;

/// A change in count.
pub type Delta = i32;

pub use stream::{Collection, AsCollection};

// TODO : I would like this trait to have something like a `Map` associated type,
// indicating the way that it would prefer to be used as an index. I think this 
// looks like Higher Kinded Types, as the associated type would need to be generic
// over values indexed against. Perhaps it can be faked in the same way that `Trace`
// fakes HKT with `TraceRef`.

/// A composite trait for data types usable in differential dataflow.
pub trait Data : timely::Data + ::std::hash::Hash + Ord + Debug {
    /// Extracts a `u64` suitable for distributing and sorting the data.
    ///
    /// The default implementation use `FnvHasher`. It might be nice to couple this more carefully
    /// with the implementor, to allow it to drive the distribution and sorting techniques. For
    /// example, dense unsigned integers would like a different function, but also must communicate
    /// that a `HashMap` is not the best way to use their values.
    #[inline]
    fn hashed(&self) -> u64 {
        let mut h: fnv::FnvHasher = Default::default();
        self.hash(&mut h);
        h.finish()
    }
}
impl<T: timely::Data + ::std::hash::Hash + Ord + Debug> Data for T { }

/// An extension of timely's `Scope` trait requiring timestamps implement `LeastUpperBound`.
///
/// The intent of this trait is that it could be used as the constraint for collections, removing the 
/// need to put `G::Timestamp: LeastUpperBound` everywhere.
pub trait TestScope : timely::dataflow::Scope where Self::Timestamp: lattice::Lattice { }
impl<S: timely::dataflow::Scope> TestScope for S where S::Timestamp: lattice::Lattice { }

extern crate fnv;
extern crate timely;
extern crate vec_map;
extern crate itertools;
extern crate linear_map;
extern crate timely_sort;
extern crate timely_communication;

pub mod collection;
pub mod operators;
pub mod lattice;
mod iterators;
mod stream;