1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9 pub col: String,
10 pub op: String,
12 pub val: Option<JsonValue>,
13}
14
15#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21 pub col: String,
22 #[serde(default)]
23 pub dir: Option<String>,
24}
25
26#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34 #[serde(default)]
35 pub col: Option<String>,
36 pub op: String,
37 #[serde(default)]
38 pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43 #[serde(default)]
46 pub columns: Vec<String>,
47 #[serde(default)]
48 pub predicates: Vec<Predicate>,
49 #[serde(default)]
52 pub group_by: Vec<String>,
53 #[serde(default)]
56 pub aggregations: Vec<Aggregation>,
57 #[serde(default)]
60 pub distinct: bool,
61 #[serde(default)]
63 pub order_by: Vec<OrderBy>,
64 #[serde(default)]
67 pub limit: Option<u64>,
68 #[serde(default = "default_page")]
69 pub page: u64,
70 #[serde(default = "default_page_size")]
71 pub page_size: u64,
72}
73
74#[derive(Clone)]
76pub struct AggSpec {
77 pub col: Option<String>,
79 pub op: AggOp,
80 pub alias: String,
82}
83
84#[derive(Clone, Copy)]
85pub enum AggOp {
86 Count,
87 Sum,
88 Avg,
89 Min,
90 Max,
91}
92impl AggOp {
93 pub fn as_sql(self) -> &'static str {
94 match self {
95 AggOp::Count => "COUNT",
96 AggOp::Sum => "SUM",
97 AggOp::Avg => "AVG",
98 AggOp::Min => "MIN",
99 AggOp::Max => "MAX",
100 }
101 }
102 pub fn name(self) -> &'static str {
103 match self {
104 AggOp::Count => "count",
105 AggOp::Sum => "sum",
106 AggOp::Avg => "avg",
107 AggOp::Min => "min",
108 AggOp::Max => "max",
109 }
110 }
111}
112
113impl AggSpec {
114 pub fn sql_expr(&self) -> Result<String, AppError> {
123 match (self.op, self.col.as_deref()) {
124 (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
125 (op, Some(c)) => Ok(format!(
126 "{}({})",
127 op.as_sql(),
128 DatasetSchema::quote_ident(c)
129 )),
130 (op, None) => Err(AppError::Internal(format!(
131 "aggregation '{}' resolved without a column (planner invariant violated)",
132 op.name()
133 ))),
134 }
135 }
136}
137
138#[derive(Clone)]
140pub struct AggPlan {
141 pub group_cols: Vec<String>,
142 pub aggs: Vec<AggSpec>,
143}
144
145impl AggPlan {
146 pub fn output_names(&self) -> Vec<String> {
150 let mut v = self.group_cols.clone();
151 v.extend(self.aggs.iter().map(|a| a.alias.clone()));
152 v
153 }
154}
155
156impl QueryRequest {
157 pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
164 if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
165 return Err(AppError::InvalidValue(
166 "distinct is mutually exclusive with group_by / aggregations".into(),
167 ));
168 }
169 if self.group_by.is_empty() {
170 if !self.aggregations.is_empty() {
171 return Err(AppError::InvalidValue(
172 "aggregations require a non-empty group_by".into(),
173 ));
174 }
175 return Ok(None);
176 }
177
178 let mut group_cols = Vec::with_capacity(self.group_by.len());
179 for name in &self.group_by {
180 group_cols.push(schema.find(name)?.name.clone());
181 }
182
183 let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
184 vec![Aggregation {
185 col: None,
186 op: "count".into(),
187 alias: None,
188 }]
189 } else {
190 self.aggregations.clone()
191 };
192
193 let mut aggs = Vec::with_capacity(raw_aggs.len());
194 for a in &raw_aggs {
195 let op = match a.op.to_ascii_lowercase().as_str() {
196 "count" => AggOp::Count,
197 "sum" => AggOp::Sum,
198 "avg" => AggOp::Avg,
199 "min" => AggOp::Min,
200 "max" => AggOp::Max,
201 other => {
202 return Err(AppError::InvalidValue(format!(
203 "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
204 )));
205 }
206 };
207 let col = match (op, a.col.as_deref()) {
208 (AggOp::Count, None) => None,
209 (_, None) => {
210 return Err(AppError::InvalidValue(format!(
211 "aggregation '{}' requires a 'col'",
212 op.name()
213 )));
214 }
215 (_, Some(c)) => Some(schema.find(c)?.name.clone()),
216 };
217 let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
218 Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
219 None => "count".into(),
220 });
221 aggs.push(AggSpec { col, op, alias });
222 }
223
224 Ok(Some(AggPlan { group_cols, aggs }))
225 }
226
227 pub fn order_by_sql(
236 &self,
237 schema: &DatasetSchema,
238 plan: Option<&AggPlan>,
239 ) -> Result<Option<String>, AppError> {
240 if self.order_by.is_empty() {
241 return Ok(None);
242 }
243 let parts: Vec<String> = self
244 .order_by
245 .iter()
246 .map(|o| {
247 let dir = match o
248 .dir
249 .as_deref()
250 .unwrap_or("asc")
251 .to_ascii_lowercase()
252 .as_str()
253 {
254 "asc" => "ASC",
255 "desc" => "DESC",
256 other => {
257 return Err(AppError::InvalidValue(format!(
258 "order_by direction must be 'asc' or 'desc' (got '{other}')"
259 )));
260 }
261 };
262 let ident = match plan {
263 Some(p) => {
264 let lc = o.col.to_lowercase();
265 let allowed = p.output_names();
266 allowed
267 .iter()
268 .find(|n| n.to_lowercase() == lc)
269 .map(|n| DatasetSchema::quote_ident(n))
270 .ok_or_else(|| {
271 AppError::UnknownColumn(format!(
272 "{} (must be a group_by column or aggregation alias)",
273 o.col
274 ))
275 })?
276 }
277 None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
278 };
279 Ok(format!("{ident} {dir}"))
280 })
281 .collect::<Result<_, _>>()?;
282 Ok(Some(parts.join(", ")))
283 }
284
285 pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
293 let page = self.page.max(1);
294 let page_size = self.page_size.clamp(1, page_size_cap);
295 let offset = (page - 1) * page_size;
296 let limit = match self.limit {
297 Some(cap) => {
298 if offset >= cap {
299 0
300 } else {
301 page_size.min(cap - offset)
302 }
303 }
304 None => page_size,
305 };
306 (limit, offset)
307 }
308}
309
310fn default_page() -> u64 {
311 1
312}
313fn default_page_size() -> u64 {
314 1000
315}
316
317#[derive(Clone, Deserialize, Default)]
320pub struct CountRequest {
321 #[serde(default)]
322 pub predicates: Vec<Predicate>,
323}
324
325#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
333
334 fn schema() -> DatasetSchema {
335 DatasetSchema::new(
336 "t",
337 vec![
338 ColumnInfo {
339 name: "id".into(),
340 logical: LogicalType::Int,
341 sql_type: "BIGINT".into(),
342 nullable: false,
343 },
344 ColumnInfo {
345 name: "name".into(),
346 logical: LogicalType::Utf8,
347 sql_type: "VARCHAR".into(),
348 nullable: true,
349 },
350 ColumnInfo {
351 name: "score".into(),
352 logical: LogicalType::Float,
353 sql_type: "DOUBLE".into(),
354 nullable: true,
355 },
356 ColumnInfo {
357 name: "Mixed".into(),
358 logical: LogicalType::Utf8,
359 sql_type: "VARCHAR".into(),
360 nullable: true,
361 },
362 ],
363 )
364 }
365
366 fn empty_req() -> QueryRequest {
367 QueryRequest {
368 columns: vec![],
369 predicates: vec![],
370 group_by: vec![],
371 aggregations: vec![],
372 distinct: false,
373 order_by: vec![],
374 limit: None,
375 page: 1,
376 page_size: 1000,
377 }
378 }
379
380 #[test]
383 fn agg_plan_none_when_no_group_by() {
384 let r = empty_req();
385 assert!(r.agg_plan(&schema()).unwrap().is_none());
386 }
387
388 #[test]
389 fn agg_plan_rejects_aggs_without_group_by() {
390 let mut r = empty_req();
391 r.aggregations = vec![Aggregation {
392 col: Some("score".into()),
393 op: "sum".into(),
394 alias: None,
395 }];
396 let err = r.agg_plan(&schema()).err().expect("expected error");
397 assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
398 }
399
400 #[test]
401 fn agg_plan_implicit_count_star() {
402 let mut r = empty_req();
403 r.group_by = vec!["name".into()];
404 let plan = r.agg_plan(&schema()).unwrap().unwrap();
405 assert_eq!(plan.group_cols, vec!["name"]);
406 assert_eq!(plan.aggs.len(), 1);
407 assert_eq!(plan.aggs[0].alias, "count");
408 assert!(plan.aggs[0].col.is_none());
409 assert!(matches!(plan.aggs[0].op, AggOp::Count));
410 }
411
412 #[test]
413 fn agg_plan_default_alias_format() {
414 let mut r = empty_req();
415 r.group_by = vec!["name".into()];
416 r.aggregations = vec![
417 Aggregation {
418 col: Some("score".into()),
419 op: "Sum".into(),
420 alias: None,
421 },
422 Aggregation {
423 col: Some("Mixed".into()),
424 op: "MAX".into(),
425 alias: Some("hi".into()),
426 },
427 ];
428 let plan = r.agg_plan(&schema()).unwrap().unwrap();
429 assert_eq!(plan.aggs[0].alias, "sum_score");
430 assert_eq!(plan.aggs[1].alias, "hi");
431 assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
433 }
434
435 #[test]
436 fn agg_plan_unknown_op() {
437 let mut r = empty_req();
438 r.group_by = vec!["name".into()];
439 r.aggregations = vec![Aggregation {
440 col: Some("score".into()),
441 op: "median".into(),
442 alias: None,
443 }];
444 let err = r.agg_plan(&schema()).err().expect("expected error");
445 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
446 }
447
448 #[test]
449 fn agg_plan_non_count_requires_col() {
450 let mut r = empty_req();
451 r.group_by = vec!["name".into()];
452 r.aggregations = vec![Aggregation {
453 col: None,
454 op: "avg".into(),
455 alias: None,
456 }];
457 let err = r.agg_plan(&schema()).err().expect("expected error");
458 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
459 }
460
461 #[test]
462 fn agg_plan_unknown_group_col() {
463 let mut r = empty_req();
464 r.group_by = vec!["nope".into()];
465 let err = r.agg_plan(&schema()).err().expect("expected error");
466 assert!(matches!(err, AppError::UnknownColumn(_)));
467 }
468
469 #[test]
470 fn agg_plan_distinct_conflicts_with_group_by() {
471 let mut r = empty_req();
472 r.distinct = true;
473 r.group_by = vec!["name".into()];
474 let err = r.agg_plan(&schema()).err().expect("expected error");
475 assert!(matches!(err, AppError::InvalidValue(_)));
476 }
477
478 #[test]
481 fn order_by_none_when_empty() {
482 let r = empty_req();
483 assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
484 }
485
486 #[test]
487 fn order_by_default_asc_and_quoting() {
488 let mut r = empty_req();
489 r.order_by = vec![OrderBy {
490 col: "ID".into(),
491 dir: None,
492 }];
493 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
494 assert_eq!(sql, "\"id\" ASC");
496 }
497
498 #[test]
499 fn order_by_desc_case_insensitive() {
500 let mut r = empty_req();
501 r.order_by = vec![OrderBy {
502 col: "name".into(),
503 dir: Some("DESC".into()),
504 }];
505 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
506 assert_eq!(sql, "\"name\" DESC");
507 }
508
509 #[test]
510 fn order_by_bad_direction() {
511 let mut r = empty_req();
512 r.order_by = vec![OrderBy {
513 col: "id".into(),
514 dir: Some("backwards".into()),
515 }];
516 let err = r.order_by_sql(&schema(), None).unwrap_err();
517 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
518 }
519
520 #[test]
521 fn order_by_unknown_col_no_plan() {
522 let mut r = empty_req();
523 r.order_by = vec![OrderBy {
524 col: "missing".into(),
525 dir: None,
526 }];
527 let err = r.order_by_sql(&schema(), None).unwrap_err();
528 assert!(matches!(err, AppError::UnknownColumn(_)));
529 }
530
531 #[test]
532 fn order_by_with_plan_restricts_to_outputs() {
533 let mut r = empty_req();
534 r.group_by = vec!["name".into()];
535 r.aggregations = vec![Aggregation {
536 col: Some("score".into()),
537 op: "sum".into(),
538 alias: Some("total".into()),
539 }];
540 let plan = r.agg_plan(&schema()).unwrap().unwrap();
541
542 r.order_by = vec![
544 OrderBy {
545 col: "name".into(),
546 dir: Some("asc".into()),
547 },
548 OrderBy {
549 col: "TOTAL".into(),
550 dir: Some("desc".into()),
551 },
552 ];
553 let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
554 assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
555
556 r.order_by = vec![OrderBy {
558 col: "id".into(),
559 dir: None,
560 }];
561 let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
562 assert!(matches!(err, AppError::UnknownColumn(_)));
563 }
564
565 #[test]
568 fn limit_offset_first_page_default() {
569 let r = empty_req();
570 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
571 }
572
573 #[test]
574 fn limit_offset_pagination() {
575 let mut r = empty_req();
576 r.page = 3;
577 r.page_size = 50;
578 assert_eq!(r.effective_limit_offset(1000), (50, 100));
579 }
580
581 #[test]
582 fn limit_offset_caps_page_size_to_max() {
583 let mut r = empty_req();
584 r.page_size = 10_000;
585 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
586 }
587
588 #[test]
589 fn limit_offset_page_zero_treated_as_one() {
590 let mut r = empty_req();
591 r.page = 0;
592 r.page_size = 10;
593 assert_eq!(r.effective_limit_offset(1000), (10, 0));
594 }
595
596 #[test]
597 fn limit_offset_top_level_cap_truncates_last_page() {
598 let mut r = empty_req();
599 r.page = 2;
600 r.page_size = 50;
601 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (25, 50));
603 }
604
605 #[test]
606 fn limit_offset_top_level_cap_exhausted_returns_zero() {
607 let mut r = empty_req();
608 r.page = 3;
609 r.page_size = 50;
610 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (0, 100));
612 }
613}