Expand description
Coppice is a dynamic programming library for acyclic analytical
queries expressed as nested map/reduce computations over the union
of smaller data sets (tablets). These map/reduce computations are
automatically cached and parallelised when executed with the
map_reduce() higher-order function.
Of course, since we know we’re only interested in final
map_reduce() results, we actually memoise fully aggregated
results for each “tablet” (small set of rows) and opaque params.
In addition to these standard inputs (and cache keys), Coppice
also passes “join keys” to the mapped functions. This third type
of inputs (in addition to rows and opaque params) enables
Coppice to offer asymptotically superior performance compared to
pure memoisation: map_reduce() essentially executes mapped
functions “in reverse,” for join keys.
The map_reduce() function ends up evaluating the mapped
function for each row and params, but extracts all join keys
that, combined with the row and params, yields a non-trivial
Aggregate. We thus extract, for each row and params, a
branching function from join keys to Aggregate, and reduce
(merge::Merge) these branching functions together for all rows
in a tablet. The maximum number of join keys that yield
non-trivial results for a given row (should) depend on the the
mapped function, but not on the rows or params… i.e., it’s
a constant.
For analytical queries, the number of scan over data files is
often what we really care about. Pure memoisation gives us scans
proportional to |tablets| * |params| * |join_keys|; that’s often
unrealistically large, which forces people to come up with ad hoc
indexing schemes. The way Coppice caches branching functions
instead of raw values means the number of scans instead scales
with |tablets| * |params|. When the join keys have a large
cardinality, shaving that |join_keys| multiplicative factor in
I/O can be a real practical win… hopefully enough to justify
the CPU overhead of Coppice’s function inversion approach.
Two key ideas underlie Coppice.
The first is that backtracking search over join keys represented as bounded arrays of bits gives us enough finite domain powers to weakly emulate logic programming. That’s not a lot, but enough to automatically build a bit-trie index from an opaque function.
The second is that many analytical queries use only hierarchical joins (a well known fact), and that writing these queries as regular code implicitly gives us the tree necessary for Yannakakis’s algorithm (maybe less well known).
In short, the Coppice is just an API trick to coerce Rust coders into writing plain code that can be executed with a version of Yannakakis’s algorithm simplified for the hierarchical subset of acyclic queries.
§Examples
See examples/ny_philharmonic.rs for an executable example that
processes sets of JSON files that each contains metadata for the
New York Philharmonic Orchestra’s performances over different
periods.
Counting the total number of programs is a simple map/reduce function:
fn load_json_dump(path: impl AsRef<Path>) -> Result<Vec<Program>, &'static str>;
fn count_programs(files: &[PathBuf]) -> Result<u64, &'static str> {
// Create a query cache where each dataset is identified by a PathBuf,
// and the summary is just a counter
coppice::query!(
CACHE(path: PathBuf) -> Counter,
load_json_dump(path), // Load the data in each path with `load_json_dump`
_program => Counter::new(1) // And count 1 for each Program in the json dump
);
Ok(CACHE.nullary_query(files)?.count)
}Counting the number of performances for each composer isn’t hard either
fn count_composer_occurrences(files: &[PathBuf]) -> Result<Vec<(String, u64)>, &'static str> {
coppice::query!(
// This time, we go from data in PathBuf to a Histogram keyed on String names
CACHE(path: PathBuf) -> Histogram<String>,
load_json_dump(path), // Again, load the Programs in each json file
row => {
let mut ret: Histogram<String> = Default::default();
// For each work in the row (in the program), add 1
// for each occurrence of a composer.
for work in row.works.iter() {
if let Some(composer) = &work.composer_name {
ret.observe(composer.to_owned(), Counter::new(1));
}
}
ret
}
);
Ok(CACHE.nullary_query(files)?.into_popularity_sorted_vec())
}It’s nice that the above is automatically cached and parallelised, but that’s nothing super interesting. The next one should better motivate the approach: we filter down the programs to only those that occurred in a given venue, and accept an optional “root” composer. The histogram count composer occurrences for programs that included a given venue and in which the root composer was also featured.
fn count_composer_cooccurrences(
files: &[PathBuf],
venue: String,
root_composer: Option<String>,
) -> Result<Vec<(String, u64)>, &'static str> {
coppice::query!(
// We take a PathBuf, a venue, and maybe a root composer, and return a histogram keyed on composer names.
COOCCURRENCES(path: PathBuf, venue: +String, root_composer: -Option<String>) -> Histogram<String>,
load_json_dump(path), // Again, load each `PathBuf` with `load_json_dump`.
rows => {
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
let venue = venue.clone(); // Post-process the `Vec<Program>` returned by load_json_dump
Ok(rows
.into_par_iter()
// Make sure the target venue appears in at least one of the concerts
.filter(move |row| row.concerts.iter().any(|concert| concert.venue == venue))
// extract the composer names.
.map(|row| {
row.works
.iter()
.map(|work| work.composer_name.clone())
.collect::<Vec<Option<String>>>()
}))
},
token, composers => {
let _ = venue; // We don't use the venue here, it was already handled above
let mut ret: Histogram<String> = Default::default();
let mut maybe_composers: Vec<&Option<String>> = vec![&None];
maybe_composers.extend(composers.iter());
let (mut token, root_composer) = token.focus(root_composer);
// If either `root_composer` is None, or matches one of the composers
// in the program...
let any_match = token.eql(root_composer, &None) || composers.iter().any(|composer| token.eql(root_composer, composer));
if any_match {
// Count occurrences in the histogram
for composer in composers.iter().flatten().cloned() {
ret.observe(composer, Counter::new(1));
}
}
ret
}
);
Ok(COOCCURRENCES
.query(files, &venue, &root_composer)?
.into_popularity_sorted_vec())
}This more complex examples shows what’s interesting about Coppice:
the query call scans the files once regardless of how many
different root_composer values we pass.
On my laptop, the first call to count_composer_cooccurrences takes 2 seconds.
Subsequence calls with various root composers (e.g., count how many times works
by each composer was played in the same program as Wagner) take ~100 micro seconds,
without any file I/O. This is possible because COOCCURRENCES.query enumerates
all possible values of root_composer that would result in a non-trival result
for each row, and caches the result in a (bad) trie.
Modules§
- aggregates
- Common aggregation results.
Macros§
- query
- Builds a
static Box<dyn Query>for a fixed set of loading / mapping functions.
Traits§
- Aggregate
- Coppice caches results from aggregate queries where the query results
implement the
Aggregatetrait. - Base
Join Key - Individual join keys (that are run “in reverse”) must be convertible to byte arrays (bit vectors really, but byte arrays are convenient).
- Hash
IsInjective - In most cases (e.g., standard and automatically derived Hash
traits), the hash trait feeds different bytes for different
inputs. In that case, we can compute a fingerprint with the
regular
std::hash::Hashimplementation. - Join
Keys - In practice, join keys are usually passed around as tuples or arrays, and
must be convertible to a. The
JoinKeystrait captures the containers ofBaseJoinKeys we know how to convert to function inversion input. - Join
Query - A
JoinQueryis a query that accepts only a list of tablets and join keys; query parameters are always (). - Nullary
Query - A
NullaryQueryis aQuerythat accepts only a list of tablets: the params and join keys are always (). - Param
Query - A
ParamQueryis aQuerythat accepts only a list of tablets and query parameters; join keys are always (). - Query
- A
Queryobject acceptsTablets,Params, andJoinKeysT, and returns aSummaryfor the union of all theTablets, given theParams, andJoinKeysT.
Functions§
- clear_
all_ caches - Clears all query caches for
map_reduceormap_map_reducecalls in the current process. - make_
map_ map_ reduce - Returns a
Queryobject for the map reduce computation that loads data fromTablets withrow_fn, transforms them withtransform_fm, and feed each transformed row, along with the parameters and join keys, toworker. - make_
map_ reduce - Returns a
Queryobject for the map reduce computation that loads data fromTablets withrow_fn, and feed each resulting row, along with the parameters and join keys, toworker. - map_
map_ reduce - The
map_map_reducegeneric function is likemap_reduce, except that an additionantransform_fnis applied to the output of therow_fnbefore inverting theworker_fn. - map_
reduce - The
map_reducegeneric function takes a list of tablets (fractional data sets), converts each to a parallel iterator of rows with therow_fn, and merges the result of executingworkerwith theparamsandjoin_keyson eachrowin all thetablets.