Crate differential_dataflow [] [src]

Differential dataflow is a high-throughput, low-latency data-parallel programming framework.

Differential dataflow programs are written in a collection-oriented style, where multisets of records are transformed and combined using primitives operations like map, filter, join, and group_by. Differential dataflow also includes a higher-order operation iterate.

Having defined a differential dataflow computation, you may then add or remove records from its inputs, and the system will automatically update the computation's outputs with the appropriate corresponding additions and removals.

Differential dataflow is built on the timely dataflow framework for data-parallel programming and so is automatically parallelizable across multiple threads, processes, and computers. Moreover, because it uses timely dataflow's primitives, it seamlessly inter-operates with other timely dataflow computations.

Differential dataflow is still very much a work in progress, with features and ergonomics still wildly in development. It is generally improving, though.

Examples

extern crate timely;
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Inspect};

use differential_dataflow::operators::*;

// construct and execute a timely dataflow
timely::execute(Configuration::Thread, |root| {

    // construct an input and group its records
    // keeping only the smallest values.
    let mut input = root.scoped(|scope| {
        let (handle, stream) = scope.new_input();
        stream.group(|key, vals, output| output.push(vals.next().unwrap()))
              .inspect(|val| println!("observed: {:?}", val));

        handle
    });

    // introduce many records
    for i in 0..1000 {
        input.send((i % 10, i % 3));
        input.advance_to(i + 1);
        root.step();
    }
});

For a more complicated example, the following fragment computes the breadth-first-search depth in a graph.

extern crate timely;
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Inspect};

use differential_dataflow::operators::*;

// construct and execute a timely dataflow
timely::execute(Configuration::Thread, |root| {

    let (edges, roots) = root.scoped(|scope| {

        let (e_in, edges) = scope.new_input::<((u32, u32), i32)>();
        let (r_in, roots) = scope.new_input::<(u32, i32)>();

        // initialize roots at distance 0
        let start = roots.map(|(x, w)| ((x, 0), w));

        // repeatedly update minimal distances to each node,
        // by describing how to do one round of updates, and then repeating.
        let limit = start.iterate(|dists| {

            // bring the invariant edges into the loop
            let edges = edges.enter(&dists.scope());

            // join current distances with edges to get +1 distances,
            // include the current distances in the set as well,
            // group by node id and keep minimum distance.
            dists.join_map(&edges, |_,&d,&n| (n,d+1))
                 .concat(&dists)
                 .group(|_, s, t| {
                     t.push((*s.peek().unwrap().0, 1))
                 })
        });

        // inspect distances!
        limit.inspect(|x| println!("observed: {:?}", x));

        (e_in, r_in)
    });

    edges.send(((0,1), 1));
    edges.send(((1,2), 1));
    edges.send(((2,3), 1));

    roots.send((0, 1));
});

Modules

collection

A time-varying multiset of records.

lattice

Partially ordered elements with a least upper bound.

operators

Timely dataflow operators specific to differential dataflow.

Structs

Collection

A mutable collection of values of type D

Traits

AsCollection

Conversion to a differential dataflow collection.

Data

A composite trait for data types usable in differential dataflow.

TestScope

An extension of timely's Scope trait requiring timestamps implement LeastUpperBound.

Type Definitions

Delta

A change in count.