use std::collections::HashSet;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ProjectionAggregate {
Count,
Sum(usize),
Min(usize),
Max(usize),
}
#[derive(Debug, Clone)]
pub struct ProjectionSpec {
pub name: String,
pub table: String,
pub group_keys: Vec<usize>,
pub aggregates: Vec<ProjectionAggregate>,
pub filter_signature: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ProjectionQuery {
pub table: String,
pub group_keys: Vec<usize>,
pub aggregates: Vec<ProjectionAggregate>,
pub filter_signature: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ProjectionMatch {
pub projection_name: String,
pub cost: u32,
}
pub fn pick_projection(
query: &ProjectionQuery,
candidates: &[ProjectionSpec],
) -> Option<ProjectionMatch> {
let mut best: Option<ProjectionMatch> = None;
for spec in candidates {
if spec.table != query.table {
continue;
}
if !is_filter_compatible(&spec.filter_signature, &query.filter_signature) {
continue;
}
if !group_keys_cover(&spec.group_keys, &query.group_keys) {
continue;
}
if !aggregates_cover(&spec.aggregates, &query.aggregates) {
continue;
}
let extra_keys = spec
.group_keys
.iter()
.filter(|k| !query.group_keys.contains(k))
.count() as u32;
let extra_aggs = spec
.aggregates
.iter()
.filter(|a| !query.aggregates.contains(a))
.count() as u32;
let cost = extra_keys * 10 + extra_aggs;
let candidate = ProjectionMatch {
projection_name: spec.name.clone(),
cost,
};
match &best {
Some(existing) if existing.cost <= cost => {}
_ => best = Some(candidate),
}
}
best
}
fn is_filter_compatible(spec: &Option<String>, query: &Option<String>) -> bool {
match (spec, query) {
(None, _) => true, (Some(_), None) => false, (Some(s), Some(q)) => s == q, }
}
fn group_keys_cover(spec_keys: &[usize], query_keys: &[usize]) -> bool {
let set: HashSet<usize> = spec_keys.iter().copied().collect();
query_keys.iter().all(|k| set.contains(k))
}
fn aggregates_cover(spec: &[ProjectionAggregate], query: &[ProjectionAggregate]) -> bool {
let set: HashSet<ProjectionAggregate> = spec.iter().copied().collect();
query.iter().all(|a| set.contains(a))
}
#[cfg(test)]
mod tests {
use super::*;
fn base_spec() -> ProjectionSpec {
ProjectionSpec {
name: "daily_by_user".into(),
table: "events".into(),
group_keys: vec![0, 1], aggregates: vec![ProjectionAggregate::Count, ProjectionAggregate::Sum(2)],
filter_signature: None,
}
}
fn narrower_spec() -> ProjectionSpec {
ProjectionSpec {
name: "daily_total".into(),
table: "events".into(),
group_keys: vec![1], aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
}
}
fn filtered_spec() -> ProjectionSpec {
ProjectionSpec {
name: "prod_daily".into(),
table: "events".into(),
group_keys: vec![1],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: Some("env = 'production'".into()),
}
}
#[test]
fn picks_matching_projection_when_query_is_a_subset() {
let query = ProjectionQuery {
table: "events".into(),
group_keys: vec![0],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
let pick = pick_projection(&query, &[base_spec()]).unwrap();
assert_eq!(pick.projection_name, "daily_by_user");
}
#[test]
fn prefers_narrower_projection_when_both_match() {
let query = ProjectionQuery {
table: "events".into(),
group_keys: vec![1],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
let pick = pick_projection(&query, &[base_spec(), narrower_spec()]).unwrap();
assert_eq!(pick.projection_name, "daily_total");
}
#[test]
fn rejects_projection_when_query_requests_unknown_aggregate() {
let query = ProjectionQuery {
table: "events".into(),
group_keys: vec![1],
aggregates: vec![ProjectionAggregate::Max(3)],
filter_signature: None,
};
assert!(pick_projection(&query, &[base_spec()]).is_none());
}
#[test]
fn rejects_projection_when_query_key_not_in_projection() {
let query = ProjectionQuery {
table: "events".into(),
group_keys: vec![5], aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
assert!(pick_projection(&query, &[base_spec(), narrower_spec()]).is_none());
}
#[test]
fn filtered_projection_matches_only_when_filters_match() {
let query_without_filter = ProjectionQuery {
table: "events".into(),
group_keys: vec![1],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
assert!(pick_projection(&query_without_filter, &[filtered_spec()]).is_none());
let query_with_filter = ProjectionQuery {
table: "events".into(),
group_keys: vec![1],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: Some("env = 'production'".into()),
};
let pick = pick_projection(&query_with_filter, &[filtered_spec()]).unwrap();
assert_eq!(pick.projection_name, "prod_daily");
}
#[test]
fn different_table_never_matches() {
let query = ProjectionQuery {
table: "other_table".into(),
group_keys: vec![0],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
assert!(pick_projection(&query, &[base_spec()]).is_none());
}
#[test]
fn empty_candidate_list_returns_none() {
let query = ProjectionQuery {
table: "events".into(),
group_keys: vec![],
aggregates: vec![ProjectionAggregate::Count],
filter_signature: None,
};
assert!(pick_projection(&query, &[]).is_none());
}
}