reddb_server/storage/query/planner/
projections.rs1use std::collections::HashSet;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum ProjectionAggregate {
26 Count,
27 Sum(usize),
28 Min(usize),
29 Max(usize),
30}
31
32#[derive(Debug, Clone)]
33pub struct ProjectionSpec {
34 pub name: String,
35 pub table: String,
37 pub group_keys: Vec<usize>,
40 pub aggregates: Vec<ProjectionAggregate>,
42 pub filter_signature: Option<String>,
47}
48
49#[derive(Debug, Clone)]
50pub struct ProjectionQuery {
51 pub table: String,
52 pub group_keys: Vec<usize>,
53 pub aggregates: Vec<ProjectionAggregate>,
54 pub filter_signature: Option<String>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
60pub struct ProjectionMatch {
61 pub projection_name: String,
62 pub cost: u32,
66}
67
68pub fn pick_projection(
69 query: &ProjectionQuery,
70 candidates: &[ProjectionSpec],
71) -> Option<ProjectionMatch> {
72 let mut best: Option<ProjectionMatch> = None;
73 for spec in candidates {
74 if spec.table != query.table {
75 continue;
76 }
77 if !is_filter_compatible(&spec.filter_signature, &query.filter_signature) {
78 continue;
79 }
80 if !group_keys_cover(&spec.group_keys, &query.group_keys) {
81 continue;
82 }
83 if !aggregates_cover(&spec.aggregates, &query.aggregates) {
84 continue;
85 }
86 let extra_keys = spec
87 .group_keys
88 .iter()
89 .filter(|k| !query.group_keys.contains(k))
90 .count() as u32;
91 let extra_aggs = spec
92 .aggregates
93 .iter()
94 .filter(|a| !query.aggregates.contains(a))
95 .count() as u32;
96 let cost = extra_keys * 10 + extra_aggs;
97 let candidate = ProjectionMatch {
98 projection_name: spec.name.clone(),
99 cost,
100 };
101 match &best {
102 Some(existing) if existing.cost <= cost => {}
103 _ => best = Some(candidate),
104 }
105 }
106 best
107}
108
109fn is_filter_compatible(spec: &Option<String>, query: &Option<String>) -> bool {
110 match (spec, query) {
111 (None, _) => true, (Some(_), None) => false, (Some(s), Some(q)) => s == q, }
115}
116
117fn group_keys_cover(spec_keys: &[usize], query_keys: &[usize]) -> bool {
118 let set: HashSet<usize> = spec_keys.iter().copied().collect();
119 query_keys.iter().all(|k| set.contains(k))
120}
121
122fn aggregates_cover(spec: &[ProjectionAggregate], query: &[ProjectionAggregate]) -> bool {
123 let set: HashSet<ProjectionAggregate> = spec.iter().copied().collect();
124 query.iter().all(|a| set.contains(a))
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 fn base_spec() -> ProjectionSpec {
132 ProjectionSpec {
133 name: "daily_by_user".into(),
134 table: "events".into(),
135 group_keys: vec![0, 1], aggregates: vec![ProjectionAggregate::Count, ProjectionAggregate::Sum(2)],
137 filter_signature: None,
138 }
139 }
140
141 fn narrower_spec() -> ProjectionSpec {
142 ProjectionSpec {
143 name: "daily_total".into(),
144 table: "events".into(),
145 group_keys: vec![1], aggregates: vec![ProjectionAggregate::Count],
147 filter_signature: None,
148 }
149 }
150
151 fn filtered_spec() -> ProjectionSpec {
152 ProjectionSpec {
153 name: "prod_daily".into(),
154 table: "events".into(),
155 group_keys: vec![1],
156 aggregates: vec![ProjectionAggregate::Count],
157 filter_signature: Some("env = 'production'".into()),
158 }
159 }
160
161 #[test]
162 fn picks_matching_projection_when_query_is_a_subset() {
163 let query = ProjectionQuery {
164 table: "events".into(),
165 group_keys: vec![0],
166 aggregates: vec![ProjectionAggregate::Count],
167 filter_signature: None,
168 };
169 let pick = pick_projection(&query, &[base_spec()]).unwrap();
170 assert_eq!(pick.projection_name, "daily_by_user");
171 }
172
173 #[test]
174 fn prefers_narrower_projection_when_both_match() {
175 let query = ProjectionQuery {
176 table: "events".into(),
177 group_keys: vec![1],
178 aggregates: vec![ProjectionAggregate::Count],
179 filter_signature: None,
180 };
181 let pick = pick_projection(&query, &[base_spec(), narrower_spec()]).unwrap();
182 assert_eq!(pick.projection_name, "daily_total");
184 }
185
186 #[test]
187 fn rejects_projection_when_query_requests_unknown_aggregate() {
188 let query = ProjectionQuery {
189 table: "events".into(),
190 group_keys: vec![1],
191 aggregates: vec![ProjectionAggregate::Max(3)],
192 filter_signature: None,
193 };
194 assert!(pick_projection(&query, &[base_spec()]).is_none());
195 }
196
197 #[test]
198 fn rejects_projection_when_query_key_not_in_projection() {
199 let query = ProjectionQuery {
200 table: "events".into(),
201 group_keys: vec![5], aggregates: vec![ProjectionAggregate::Count],
203 filter_signature: None,
204 };
205 assert!(pick_projection(&query, &[base_spec(), narrower_spec()]).is_none());
206 }
207
208 #[test]
209 fn filtered_projection_matches_only_when_filters_match() {
210 let query_without_filter = ProjectionQuery {
211 table: "events".into(),
212 group_keys: vec![1],
213 aggregates: vec![ProjectionAggregate::Count],
214 filter_signature: None,
215 };
216 assert!(pick_projection(&query_without_filter, &[filtered_spec()]).is_none());
217
218 let query_with_filter = ProjectionQuery {
219 table: "events".into(),
220 group_keys: vec![1],
221 aggregates: vec![ProjectionAggregate::Count],
222 filter_signature: Some("env = 'production'".into()),
223 };
224 let pick = pick_projection(&query_with_filter, &[filtered_spec()]).unwrap();
225 assert_eq!(pick.projection_name, "prod_daily");
226 }
227
228 #[test]
229 fn different_table_never_matches() {
230 let query = ProjectionQuery {
231 table: "other_table".into(),
232 group_keys: vec![0],
233 aggregates: vec![ProjectionAggregate::Count],
234 filter_signature: None,
235 };
236 assert!(pick_projection(&query, &[base_spec()]).is_none());
237 }
238
239 #[test]
240 fn empty_candidate_list_returns_none() {
241 let query = ProjectionQuery {
242 table: "events".into(),
243 group_keys: vec![],
244 aggregates: vec![ProjectionAggregate::Count],
245 filter_signature: None,
246 };
247 assert!(pick_projection(&query, &[]).is_none());
248 }
249}