Skip to main content

reddb_server/storage/query/planner/
projections.rs

1//! ClickHouse-style projections — query-specific pre-aggregated
2//! indexes the planner can transparently pick over the base table.
3//!
4//! A [`ProjectionSpec`] declares:
5//! * which columns are group keys,
6//! * which aggregates are stored,
7//! * which filter (optional) restricts the source rows.
8//!
9//! The planner picks a projection when the query's shape is a
10//! **subset** of the projection's shape: every group key mentioned
11//! in the query's `GROUP BY` must exist in the projection, every
12//! requested aggregate must be present, and the query's filter must
13//! be compatible (either the same or tighter via an additional
14//! predicate).
15//!
16//! This module models only the metadata + matcher. The actual
17//! storage maintenance (CDC hook, incremental refresh) lives in a
18//! follow-on sprint once SQL DDL wires `ALTER TABLE ... ADD
19//! PROJECTION`. Keeping the matcher standalone lets tests exercise
20//! the routing rules on synthetic specs.
21
22use std::collections::HashSet;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum ProjectionAggregate {
26    Count,
27    Sum(usize),
28    Min(usize),
29    Max(usize),
30}
31
32#[derive(Debug, Clone)]
33pub struct ProjectionSpec {
34    pub name: String,
35    /// Table the projection is attached to.
36    pub table: String,
37    /// Columns (by index in the parent table schema) used as group
38    /// keys.
39    pub group_keys: Vec<usize>,
40    /// Aggregates materialised in this projection.
41    pub aggregates: Vec<ProjectionAggregate>,
42    /// Optional pre-applied filter — e.g.
43    /// `WHERE env = 'production'`. Represented as a canonical string
44    /// because the matcher only needs equality on the serialised
45    /// form; a richer AST comparison can plug in later.
46    pub filter_signature: Option<String>,
47}
48
49#[derive(Debug, Clone)]
50pub struct ProjectionQuery {
51    pub table: String,
52    pub group_keys: Vec<usize>,
53    pub aggregates: Vec<ProjectionAggregate>,
54    pub filter_signature: Option<String>,
55}
56
57/// Result of matching a query against a set of projections. Callers
58/// use the lowest-cost hit; ties broken by declaration order.
59#[derive(Debug, Clone, PartialEq)]
60pub struct ProjectionMatch {
61    pub projection_name: String,
62    /// Relative cost hint. Smaller = better. Currently encodes
63    /// "extra group keys the query doesn't need" as cost — so the
64    /// narrowest-fit projection wins.
65    pub cost: u32,
66}
67
68pub fn pick_projection(
69    query: &ProjectionQuery,
70    candidates: &[ProjectionSpec],
71) -> Option<ProjectionMatch> {
72    let mut best: Option<ProjectionMatch> = None;
73    for spec in candidates {
74        if spec.table != query.table {
75            continue;
76        }
77        if !is_filter_compatible(&spec.filter_signature, &query.filter_signature) {
78            continue;
79        }
80        if !group_keys_cover(&spec.group_keys, &query.group_keys) {
81            continue;
82        }
83        if !aggregates_cover(&spec.aggregates, &query.aggregates) {
84            continue;
85        }
86        let extra_keys = spec
87            .group_keys
88            .iter()
89            .filter(|k| !query.group_keys.contains(k))
90            .count() as u32;
91        let extra_aggs = spec
92            .aggregates
93            .iter()
94            .filter(|a| !query.aggregates.contains(a))
95            .count() as u32;
96        let cost = extra_keys * 10 + extra_aggs;
97        let candidate = ProjectionMatch {
98            projection_name: spec.name.clone(),
99            cost,
100        };
101        match &best {
102            Some(existing) if existing.cost <= cost => {}
103            _ => best = Some(candidate),
104        }
105    }
106    best
107}
108
109fn is_filter_compatible(spec: &Option<String>, query: &Option<String>) -> bool {
110    match (spec, query) {
111        (None, _) => true,            // projection covers all rows
112        (Some(_), None) => false,     // projection is narrower than query
113        (Some(s), Some(q)) => s == q, // only identical filters allowed for now
114    }
115}
116
117fn group_keys_cover(spec_keys: &[usize], query_keys: &[usize]) -> bool {
118    let set: HashSet<usize> = spec_keys.iter().copied().collect();
119    query_keys.iter().all(|k| set.contains(k))
120}
121
122fn aggregates_cover(spec: &[ProjectionAggregate], query: &[ProjectionAggregate]) -> bool {
123    let set: HashSet<ProjectionAggregate> = spec.iter().copied().collect();
124    query.iter().all(|a| set.contains(a))
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    fn base_spec() -> ProjectionSpec {
132        ProjectionSpec {
133            name: "daily_by_user".into(),
134            table: "events".into(),
135            group_keys: vec![0, 1], // user_id, day
136            aggregates: vec![ProjectionAggregate::Count, ProjectionAggregate::Sum(2)],
137            filter_signature: None,
138        }
139    }
140
141    fn narrower_spec() -> ProjectionSpec {
142        ProjectionSpec {
143            name: "daily_total".into(),
144            table: "events".into(),
145            group_keys: vec![1], // day only
146            aggregates: vec![ProjectionAggregate::Count],
147            filter_signature: None,
148        }
149    }
150
151    fn filtered_spec() -> ProjectionSpec {
152        ProjectionSpec {
153            name: "prod_daily".into(),
154            table: "events".into(),
155            group_keys: vec![1],
156            aggregates: vec![ProjectionAggregate::Count],
157            filter_signature: Some("env = 'production'".into()),
158        }
159    }
160
161    #[test]
162    fn picks_matching_projection_when_query_is_a_subset() {
163        let query = ProjectionQuery {
164            table: "events".into(),
165            group_keys: vec![0],
166            aggregates: vec![ProjectionAggregate::Count],
167            filter_signature: None,
168        };
169        let pick = pick_projection(&query, &[base_spec()]).unwrap();
170        assert_eq!(pick.projection_name, "daily_by_user");
171    }
172
173    #[test]
174    fn prefers_narrower_projection_when_both_match() {
175        let query = ProjectionQuery {
176            table: "events".into(),
177            group_keys: vec![1],
178            aggregates: vec![ProjectionAggregate::Count],
179            filter_signature: None,
180        };
181        let pick = pick_projection(&query, &[base_spec(), narrower_spec()]).unwrap();
182        // narrower has fewer extra group keys ⇒ lower cost ⇒ wins.
183        assert_eq!(pick.projection_name, "daily_total");
184    }
185
186    #[test]
187    fn rejects_projection_when_query_requests_unknown_aggregate() {
188        let query = ProjectionQuery {
189            table: "events".into(),
190            group_keys: vec![1],
191            aggregates: vec![ProjectionAggregate::Max(3)],
192            filter_signature: None,
193        };
194        assert!(pick_projection(&query, &[base_spec()]).is_none());
195    }
196
197    #[test]
198    fn rejects_projection_when_query_key_not_in_projection() {
199        let query = ProjectionQuery {
200            table: "events".into(),
201            group_keys: vec![5], // never grouped by this in any spec
202            aggregates: vec![ProjectionAggregate::Count],
203            filter_signature: None,
204        };
205        assert!(pick_projection(&query, &[base_spec(), narrower_spec()]).is_none());
206    }
207
208    #[test]
209    fn filtered_projection_matches_only_when_filters_match() {
210        let query_without_filter = ProjectionQuery {
211            table: "events".into(),
212            group_keys: vec![1],
213            aggregates: vec![ProjectionAggregate::Count],
214            filter_signature: None,
215        };
216        assert!(pick_projection(&query_without_filter, &[filtered_spec()]).is_none());
217
218        let query_with_filter = ProjectionQuery {
219            table: "events".into(),
220            group_keys: vec![1],
221            aggregates: vec![ProjectionAggregate::Count],
222            filter_signature: Some("env = 'production'".into()),
223        };
224        let pick = pick_projection(&query_with_filter, &[filtered_spec()]).unwrap();
225        assert_eq!(pick.projection_name, "prod_daily");
226    }
227
228    #[test]
229    fn different_table_never_matches() {
230        let query = ProjectionQuery {
231            table: "other_table".into(),
232            group_keys: vec![0],
233            aggregates: vec![ProjectionAggregate::Count],
234            filter_signature: None,
235        };
236        assert!(pick_projection(&query, &[base_spec()]).is_none());
237    }
238
239    #[test]
240    fn empty_candidate_list_returns_none() {
241        let query = ProjectionQuery {
242            table: "events".into(),
243            group_keys: vec![],
244            aggregates: vec![ProjectionAggregate::Count],
245            filter_signature: None,
246        };
247        assert!(pick_projection(&query, &[]).is_none());
248    }
249}