Crate coppice

Crate coppice 

Source
Expand description

Coppice is a dynamic programming library for acyclic analytical queries expressed as nested map/reduce computations over the union of smaller data sets (tablets). These map/reduce computations are automatically cached and parallelised when executed with the map_reduce() higher-order function.

Of course, since we know we’re only interested in final map_reduce() results, we actually memoise fully aggregated results for each “tablet” (small set of rows) and opaque params.

In addition to these standard inputs (and cache keys), Coppice also passes “join keys” to the mapped functions. This third type of inputs (in addition to rows and opaque params) enables Coppice to offer asymptotically superior performance compared to pure memoisation: map_reduce() essentially executes mapped functions “in reverse,” for join keys.

The map_reduce() function ends up evaluating the mapped function for each row and params, but extracts all join keys that, combined with the row and params, yields a non-trivial Aggregate. We thus extract, for each row and params, a branching function from join keys to Aggregate, and reduce (merge::Merge) these branching functions together for all rows in a tablet. The maximum number of join keys that yield non-trivial results for a given row (should) depend on the the mapped function, but not on the rows or params… i.e., it’s a constant.

For analytical queries, the number of scan over data files is often what we really care about. Pure memoisation gives us scans proportional to |tablets| * |params| * |join_keys|; that’s often unrealistically large, which forces people to come up with ad hoc indexing schemes. The way Coppice caches branching functions instead of raw values means the number of scans instead scales with |tablets| * |params|. When the join keys have a large cardinality, shaving that |join_keys| multiplicative factor in I/O can be a real practical win… hopefully enough to justify the CPU overhead of Coppice’s function inversion approach.

Two key ideas underlie Coppice.

The first is that backtracking search over join keys represented as bounded arrays of bits gives us enough finite domain powers to weakly emulate logic programming. That’s not a lot, but enough to automatically build a bit-trie index from an opaque function.

The second is that many analytical queries use only hierarchical joins (a well known fact), and that writing these queries as regular code implicitly gives us the tree necessary for Yannakakis’s algorithm (maybe less well known).

In short, the Coppice is just an API trick to coerce Rust coders into writing plain code that can be executed with a version of Yannakakis’s algorithm simplified for the hierarchical subset of acyclic queries.

§Examples

See examples/ny_philharmonic.rs for an executable example that processes sets of JSON files that each contains metadata for the New York Philharmonic Orchestra’s performances over different periods.

Counting the total number of programs is a simple map/reduce function:

fn load_json_dump(path: impl AsRef<Path>) -> Result<Vec<Program>, &'static str>;

fn count_programs(files: &[PathBuf]) -> Result<u64, &'static str> {
    // Create a query cache where each dataset is identified by a PathBuf,
    // and the summary is just a counter
    coppice::query!(
        CACHE(path: PathBuf) -> Counter,
        load_json_dump(path),  // Load the data in each path with `load_json_dump`
        _program => Counter::new(1) // And count 1 for each Program in the json dump
    );

    Ok(CACHE.nullary_query(files)?.count)
}

Counting the number of performances for each composer isn’t hard either

fn count_composer_occurrences(files: &[PathBuf]) -> Result<Vec<(String, u64)>, &'static str> {
    coppice::query!(
        // This time, we go from data in PathBuf to a Histogram keyed on String names
        CACHE(path: PathBuf) -> Histogram<String>,
        load_json_dump(path),  // Again, load the Programs in each json file
        row => {
            let mut ret: Histogram<String> = Default::default();

            // For each work in the row (in the program), add 1
            // for each occurrence of a composer.
            for work in row.works.iter() {
                if let Some(composer) = &work.composer_name {
                    ret.observe(composer.to_owned(), Counter::new(1));
                }
            }

            ret
        }
    );

    Ok(CACHE.nullary_query(files)?.into_popularity_sorted_vec())
}

It’s nice that the above is automatically cached and parallelised, but that’s nothing super interesting. The next one should better motivate the approach: we filter down the programs to only those that occurred in a given venue, and accept an optional “root” composer. The histogram count composer occurrences for programs that included a given venue and in which the root composer was also featured.

fn count_composer_cooccurrences(
    files: &[PathBuf],
    venue: String,
    root_composer: Option<String>,
) -> Result<Vec<(String, u64)>, &'static str> {
    coppice::query!(
        // We take a PathBuf, a venue, and maybe a root composer, and return a histogram keyed on composer names.
        COOCCURRENCES(path: PathBuf, venue: +String, root_composer: -Option<String>) -> Histogram<String>,
        load_json_dump(path),  // Again, load each `PathBuf` with `load_json_dump`.
        rows => {
            use rayon::iter::IntoParallelIterator;
            use rayon::iter::ParallelIterator;

            let venue = venue.clone();  // Post-process the `Vec<Program>` returned by load_json_dump
            Ok(rows
                .into_par_iter()
                // Make sure the target venue appears in at least one of the concerts
                .filter(move |row| row.concerts.iter().any(|concert| concert.venue == venue))
                // extract the composer names.
                .map(|row| {
                    row.works
                        .iter()
                        .map(|work| work.composer_name.clone())
                        .collect::<Vec<Option<String>>>()
                }))
        },
        token, composers => {
            let _ = venue;  // We don't use the venue here, it was already handled above
            let mut ret: Histogram<String> = Default::default();

            let mut maybe_composers: Vec<&Option<String>> = vec![&None];
            maybe_composers.extend(composers.iter());

            let (mut token, root_composer) = token.focus(root_composer);

            // If either `root_composer` is None, or matches one of the composers
            // in the program...
            let any_match = token.eql(root_composer, &None) || composers.iter().any(|composer| token.eql(root_composer, composer));

            if any_match {
                // Count occurrences in the histogram
                for composer in composers.iter().flatten().cloned() {
                    ret.observe(composer, Counter::new(1));
                }
            }

            ret
        }
    );

    Ok(COOCCURRENCES
        .query(files, &venue, &root_composer)?
        .into_popularity_sorted_vec())
}

This more complex examples shows what’s interesting about Coppice: the query call scans the files once regardless of how many different root_composer values we pass.

On my laptop, the first call to count_composer_cooccurrences takes 2 seconds. Subsequence calls with various root composers (e.g., count how many times works by each composer was played in the same program as Wagner) take ~100 micro seconds, without any file I/O. This is possible because COOCCURRENCES.query enumerates all possible values of root_composer that would result in a non-trival result for each row, and caches the result in a (bad) trie.

Modules§

aggregates
Common aggregation results.

Macros§

query
Builds a static Box<dyn Query> for a fixed set of loading / mapping functions.

Traits§

Aggregate
Coppice caches results from aggregate queries where the query results implement the Aggregate trait.
BaseJoinKey
Individual join keys (that are run “in reverse”) must be convertible to byte arrays (bit vectors really, but byte arrays are convenient).
HashIsInjective
In most cases (e.g., standard and automatically derived Hash traits), the hash trait feeds different bytes for different inputs. In that case, we can compute a fingerprint with the regular std::hash::Hash implementation.
JoinKeys
In practice, join keys are usually passed around as tuples or arrays, and must be convertible to a. The JoinKeys trait captures the containers of BaseJoinKeys we know how to convert to function inversion input.
JoinQuery
A JoinQuery is a query that accepts only a list of tablets and join keys; query parameters are always ().
NullaryQuery
A NullaryQuery is a Query that accepts only a list of tablets: the params and join keys are always ().
ParamQuery
A ParamQuery is a Query that accepts only a list of tablets and query parameters; join keys are always ().
Query
A Query object accepts Tablets, Params, and JoinKeysT, and returns a Summary for the union of all the Tablets, given the Params, and JoinKeysT.

Functions§

clear_all_caches
Clears all query caches for map_reduce or map_map_reduce calls in the current process.
make_map_map_reduce
Returns a Query object for the map reduce computation that loads data from Tablets with row_fn, transforms them with transform_fm, and feed each transformed row, along with the parameters and join keys, to worker.
make_map_reduce
Returns a Query object for the map reduce computation that loads data from Tablets with row_fn, and feed each resulting row, along with the parameters and join keys, to worker.
map_map_reduce
The map_map_reduce generic function is like map_reduce, except that an additionan transform_fn is applied to the output of the row_fn before inverting the worker_fn.
map_reduce
The map_reduce generic function takes a list of tablets (fractional data sets), converts each to a parallel iterator of rows with the row_fn, and merges the result of executing worker with the params and join_keys on each row in all the tablets.