1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework. //! //! Differential dataflow programs are written in a collection-oriented style, where multisets of //! records are transformed and combined using primitives operations like `map`, `filter`, //! `join`, and `group_by`. Differential dataflow also includes a higher-order operation `iterate`. //! //! Having defined a differential dataflow computation, you may then add or remove records from its //! inputs, and the system will automatically update the computation's outputs with the appropriate //! corresponding additions and removals. //! //! Differential dataflow is built on the timely dataflow framework for data-parallel programming //! and so is automatically parallelizable across multiple threads, processes, and computers. //! Moreover, because it uses timely dataflow's primitives, it seamlessly inter-operates with other //! timely dataflow computations. //! //! Differential dataflow is still very much a work in progress, with features and ergonomics still //! wildly in development. It is generally improving, though. //! //! # Examples //! //! ```ignore //! extern crate timely; //! use timely::*; //! use timely::dataflow::Scope; //! use timely::dataflow::operators::{Input, Inspect}; //! //! use differential_dataflow::operators::*; //! //! // construct and execute a timely dataflow //! timely::execute(Configuration::Thread, |root| { //! //! // construct an input and group its records //! // keeping only the smallest values. //! let mut input = root.scoped(|scope| { //! let (handle, stream) = scope.new_input(); //! stream.group(|key, vals, output| output.push(vals.next().unwrap())) //! .inspect(|val| println!("observed: {:?}", val)); //! //! handle //! }); //! //! // introduce many records //! for i in 0..1000 { //! input.send((i % 10, i % 3)); //! input.advance_to(i + 1); //! root.step(); //! } //! }); //! ``` //! //! //! //! For a more complicated example, the following fragment computes the breadth-first-search depth //! in a graph. //! //! ```ignore //! extern crate timely; //! use timely::*; //! use timely::dataflow::Scope; //! use timely::dataflow::operators::{Input, Inspect}; //! //! use differential_dataflow::operators::*; //! //! // construct and execute a timely dataflow //! timely::execute(Configuration::Thread, |root| { //! //! let (edges, roots) = root.scoped(|scope| { //! //! let (e_in, edges) = scope.new_input::<((u32, u32), i32)>(); //! let (r_in, roots) = scope.new_input::<(u32, i32)>(); //! //! // initialize roots at distance 0 //! let start = roots.map(|(x, w)| ((x, 0), w)); //! //! // repeatedly update minimal distances to each node, //! // by describing how to do one round of updates, and then repeating. //! let limit = start.iterate(|dists| { //! //! // bring the invariant edges into the loop //! let edges = edges.enter(&dists.scope()); //! //! // join current distances with edges to get +1 distances, //! // include the current distances in the set as well, //! // group by node id and keep minimum distance. //! dists.join_map(&edges, |_,&d,&n| (n,d+1)) //! .concat(&dists) //! .group(|_, s, t| { //! t.push((*s.peek().unwrap().0, 1)) //! }) //! }); //! //! // inspect distances! //! limit.inspect(|x| println!("observed: {:?}", x)); //! //! (e_in, r_in) //! }); //! //! edges.send(((0,1), 1)); //! edges.send(((1,2), 1)); //! edges.send(((2,3), 1)); //! //! roots.send((0, 1)); //! }); //! ``` #![forbid(missing_docs)] use std::hash::Hasher; use std::fmt::Debug; /// A change in count. pub type Delta = i32; pub use stream::{Collection, AsCollection}; // TODO : I would like this trait to have something like a `Map` associated type, // indicating the way that it would prefer to be used as an index. I think this // looks like Higher Kinded Types, as the associated type would need to be generic // over values indexed against. Perhaps it can be faked in the same way that `Trace` // fakes HKT with `TraceRef`. /// A composite trait for data types usable in differential dataflow. pub trait Data : timely::Data + ::std::hash::Hash + Ord + Debug { /// Extracts a `u64` suitable for distributing and sorting the data. /// /// The default implementation use `FnvHasher`. It might be nice to couple this more carefully /// with the implementor, to allow it to drive the distribution and sorting techniques. For /// example, dense unsigned integers would like a different function, but also must communicate /// that a `HashMap` is not the best way to use their values. #[inline] fn hashed(&self) -> u64 { let mut h: fnv::FnvHasher = Default::default(); self.hash(&mut h); h.finish() } } impl<T: timely::Data + ::std::hash::Hash + Ord + Debug> Data for T { } /// An extension of timely's `Scope` trait requiring timestamps implement `LeastUpperBound`. /// /// The intent of this trait is that it could be used as the constraint for collections, removing the /// need to put `G::Timestamp: LeastUpperBound` everywhere. pub trait TestScope : timely::dataflow::Scope where Self::Timestamp: lattice::Lattice { } impl<S: timely::dataflow::Scope> TestScope for S where S::Timestamp: lattice::Lattice { } extern crate fnv; extern crate timely; extern crate vec_map; extern crate itertools; extern crate linear_map; extern crate timely_sort; extern crate timely_communication; pub mod collection; pub mod operators; pub mod lattice; mod iterators; mod stream;