Crate differential_dataflow [−] [src]
Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
Differential dataflow programs are written in a collection-oriented style, where multisets of
records are transformed and combined using primitives operations like map
, filter
,
join
, and group_by
. Differential dataflow also includes a higher-order operation iterate
.
Having defined a differential dataflow computation, you may then add or remove records from its inputs, and the system will automatically update the computation's outputs with the appropriate corresponding additions and removals.
Differential dataflow is built on the timely dataflow framework for data-parallel programming and so is automatically parallelizable across multiple threads, processes, and computers. Moreover, because it uses timely dataflow's primitives, it seamlessly inter-operates with other timely dataflow computations.
Differential dataflow is still very much a work in progress, with features and ergonomics still wildly in development. It is generally improving, though.
Examples
extern crate timely; use timely::*; use timely::dataflow::Scope; use timely::dataflow::operators::{Input, Inspect}; use differential_dataflow::operators::*; // construct and execute a timely dataflow timely::execute(Configuration::Thread, |root| { // construct an input and group its records // keeping only the smallest values. let mut input = root.scoped(|scope| { let (handle, stream) = scope.new_input(); stream.group(|key, vals, output| output.push(vals.next().unwrap())) .inspect(|val| println!("observed: {:?}", val)); handle }); // introduce many records for i in 0..1000 { input.send((i % 10, i % 3)); input.advance_to(i + 1); root.step(); } });
For a more complicated example, the following fragment computes the breadth-first-search depth in a graph.
extern crate timely; use timely::*; use timely::dataflow::Scope; use timely::dataflow::operators::{Input, Inspect}; use differential_dataflow::operators::*; // construct and execute a timely dataflow timely::execute(Configuration::Thread, |root| { let (edges, roots) = root.scoped(|scope| { let (e_in, edges) = scope.new_input::<((u32, u32), i32)>(); let (r_in, roots) = scope.new_input::<(u32, i32)>(); // initialize roots at distance 0 let start = roots.map(|(x, w)| ((x, 0), w)); // repeatedly update minimal distances to each node, // by describing how to do one round of updates, and then repeating. let limit = start.iterate(|dists| { // bring the invariant edges into the loop let edges = edges.enter(&dists.scope()); // join current distances with edges to get +1 distances, // include the current distances in the set as well, // group by node id and keep minimum distance. dists.join_map(&edges, |_,&d,&n| (n,d+1)) .concat(&dists) .group(|_, s, t| { t.push((*s.peek().unwrap().0, 1)) }) }); // inspect distances! limit.inspect(|x| println!("observed: {:?}", x)); (e_in, r_in) }); edges.send(((0,1), 1)); edges.send(((1,2), 1)); edges.send(((2,3), 1)); roots.send((0, 1)); });
Modules
collection |
A time-varying multiset of records. |
lattice |
Partially ordered elements with a least upper bound. |
operators |
Timely dataflow operators specific to differential dataflow. |
Structs
Collection |
A mutable collection of values of type |
Traits
AsCollection |
Conversion to a differential dataflow collection. |
Data |
A composite trait for data types usable in differential dataflow. |
TestScope |
An extension of timely's |
Type Definitions
Delta |
A change in count. |