1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9 pub col: String,
10 pub op: String,
12 pub val: Option<JsonValue>,
13}
14
15#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21 pub col: String,
22 #[serde(default)]
23 pub dir: Option<String>,
24}
25
26#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34 #[serde(default)]
35 pub col: Option<String>,
36 pub op: String,
37 #[serde(default)]
38 pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43 #[serde(default)]
46 pub columns: Vec<String>,
47 #[serde(default)]
48 pub predicates: Vec<Predicate>,
49 #[serde(default)]
52 pub group_by: Vec<String>,
53 #[serde(default)]
56 pub aggregations: Vec<Aggregation>,
57 #[serde(default)]
63 pub having: Vec<Predicate>,
64 #[serde(default)]
67 pub distinct: bool,
68 #[serde(default)]
70 pub order_by: Vec<OrderBy>,
71 #[serde(default)]
74 pub limit: Option<u64>,
75 #[serde(default = "default_page")]
76 pub page: u64,
77 #[serde(default = "default_page_size")]
78 pub page_size: u64,
79}
80
81#[derive(Clone, Deserialize)]
88pub struct SqlRequest {
89 pub sql: String,
92 #[serde(default)]
95 pub max_rows: Option<u64>,
96}
97
98#[derive(Clone)]
100pub struct AggSpec {
101 pub col: Option<String>,
103 pub op: AggOp,
104 pub alias: String,
106}
107
108#[derive(Clone, Copy)]
109pub enum AggOp {
110 Count,
111 Sum,
112 Avg,
113 Min,
114 Max,
115}
116impl AggOp {
117 pub fn as_sql(self) -> &'static str {
118 match self {
119 AggOp::Count => "COUNT",
120 AggOp::Sum => "SUM",
121 AggOp::Avg => "AVG",
122 AggOp::Min => "MIN",
123 AggOp::Max => "MAX",
124 }
125 }
126 pub fn name(self) -> &'static str {
127 match self {
128 AggOp::Count => "count",
129 AggOp::Sum => "sum",
130 AggOp::Avg => "avg",
131 AggOp::Min => "min",
132 AggOp::Max => "max",
133 }
134 }
135}
136
137impl AggSpec {
138 pub fn sql_expr(&self) -> Result<String, AppError> {
147 match (self.op, self.col.as_deref()) {
148 (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
149 (op, Some(c)) => Ok(format!(
150 "{}({})",
151 op.as_sql(),
152 DatasetSchema::quote_ident(c)
153 )),
154 (op, None) => Err(AppError::Internal(format!(
155 "aggregation '{}' resolved without a column (planner invariant violated)",
156 op.name()
157 ))),
158 }
159 }
160}
161
162#[derive(Clone)]
164pub struct AggPlan {
165 pub group_cols: Vec<String>,
166 pub aggs: Vec<AggSpec>,
167}
168
169impl AggPlan {
170 pub fn output_names(&self) -> Vec<String> {
174 let mut v = self.group_cols.clone();
175 v.extend(self.aggs.iter().map(|a| a.alias.clone()));
176 v
177 }
178
179 pub fn having_lhs(&self, name: &str) -> Result<String, AppError> {
186 let lc = name.to_lowercase();
187 if let Some(g) = self.group_cols.iter().find(|c| c.to_lowercase() == lc) {
188 return Ok(DatasetSchema::quote_ident(g));
189 }
190 if let Some(a) = self.aggs.iter().find(|a| a.alias.to_lowercase() == lc) {
191 return a.sql_expr();
192 }
193 Err(AppError::UnknownColumn(format!(
194 "{name} (must be a group_by column or aggregation alias)"
195 )))
196 }
197}
198
199impl QueryRequest {
200 pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
207 if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
208 return Err(AppError::InvalidValue(
209 "distinct is mutually exclusive with group_by / aggregations".into(),
210 ));
211 }
212 if self.group_by.is_empty() {
213 if !self.aggregations.is_empty() {
214 return Err(AppError::InvalidValue(
215 "aggregations require a non-empty group_by".into(),
216 ));
217 }
218 return Ok(None);
219 }
220
221 let mut group_cols = Vec::with_capacity(self.group_by.len());
222 for name in &self.group_by {
223 group_cols.push(schema.find(name)?.name.clone());
224 }
225
226 let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
227 vec![Aggregation {
228 col: None,
229 op: "count".into(),
230 alias: None,
231 }]
232 } else {
233 self.aggregations.clone()
234 };
235
236 let mut aggs = Vec::with_capacity(raw_aggs.len());
237 for a in &raw_aggs {
238 let op = match a.op.to_ascii_lowercase().as_str() {
239 "count" => AggOp::Count,
240 "sum" => AggOp::Sum,
241 "avg" => AggOp::Avg,
242 "min" => AggOp::Min,
243 "max" => AggOp::Max,
244 other => {
245 return Err(AppError::InvalidValue(format!(
246 "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
247 )));
248 }
249 };
250 let col = match (op, a.col.as_deref()) {
251 (AggOp::Count, None) => None,
252 (_, None) => {
253 return Err(AppError::InvalidValue(format!(
254 "aggregation '{}' requires a 'col'",
255 op.name()
256 )));
257 }
258 (_, Some(c)) => Some(schema.find(c)?.name.clone()),
259 };
260 let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
261 Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
262 None => "count".into(),
263 });
264 aggs.push(AggSpec { col, op, alias });
265 }
266
267 Ok(Some(AggPlan { group_cols, aggs }))
268 }
269
270 pub fn having_plan<'a>(
280 &'a self,
281 plan: Option<&AggPlan>,
282 ) -> Result<Vec<(String, &'a Predicate)>, AppError> {
283 if self.having.is_empty() {
284 return Ok(Vec::new());
285 }
286 let plan = plan.ok_or_else(|| {
287 AppError::InvalidValue("having requires a non-empty group_by".into())
288 })?;
289 self.having
290 .iter()
291 .map(|p| Ok((plan.having_lhs(&p.col)?, p)))
292 .collect()
293 }
294
295 pub fn order_by_sql(
304 &self,
305 schema: &DatasetSchema,
306 plan: Option<&AggPlan>,
307 ) -> Result<Option<String>, AppError> {
308 if self.order_by.is_empty() {
309 return Ok(None);
310 }
311 let parts: Vec<String> = self
312 .order_by
313 .iter()
314 .map(|o| {
315 let dir = match o
316 .dir
317 .as_deref()
318 .unwrap_or("asc")
319 .to_ascii_lowercase()
320 .as_str()
321 {
322 "asc" => "ASC",
323 "desc" => "DESC",
324 other => {
325 return Err(AppError::InvalidValue(format!(
326 "order_by direction must be 'asc' or 'desc' (got '{other}')"
327 )));
328 }
329 };
330 let ident = match plan {
331 Some(p) => {
332 let lc = o.col.to_lowercase();
333 let allowed = p.output_names();
334 allowed
335 .iter()
336 .find(|n| n.to_lowercase() == lc)
337 .map(|n| DatasetSchema::quote_ident(n))
338 .ok_or_else(|| {
339 AppError::UnknownColumn(format!(
340 "{} (must be a group_by column or aggregation alias)",
341 o.col
342 ))
343 })?
344 }
345 None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
346 };
347 Ok(format!("{ident} {dir}"))
348 })
349 .collect::<Result<_, _>>()?;
350 Ok(Some(parts.join(", ")))
351 }
352
353 pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
361 let page = self.page.max(1);
362 let page_size = self.page_size.clamp(1, page_size_cap);
363 let offset = (page - 1) * page_size;
364 let limit = match self.limit {
365 Some(cap) => {
366 if offset >= cap {
367 0
368 } else {
369 page_size.min(cap - offset)
370 }
371 }
372 None => page_size,
373 };
374 (limit, offset)
375 }
376}
377
378fn default_page() -> u64 {
379 1
380}
381fn default_page_size() -> u64 {
382 1000
383}
384
385#[derive(Clone, Deserialize, Default)]
388pub struct CountRequest {
389 #[serde(default)]
390 pub predicates: Vec<Predicate>,
391}
392
393#[cfg(test)]
398mod tests {
399 use super::*;
400 use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
401
402 fn schema() -> DatasetSchema {
403 DatasetSchema::new(
404 "t",
405 vec![
406 ColumnInfo {
407 name: "id".into(),
408 logical: LogicalType::Int,
409 sql_type: "BIGINT".into(),
410 nullable: false,
411 },
412 ColumnInfo {
413 name: "name".into(),
414 logical: LogicalType::Utf8,
415 sql_type: "VARCHAR".into(),
416 nullable: true,
417 },
418 ColumnInfo {
419 name: "score".into(),
420 logical: LogicalType::Float,
421 sql_type: "DOUBLE".into(),
422 nullable: true,
423 },
424 ColumnInfo {
425 name: "Mixed".into(),
426 logical: LogicalType::Utf8,
427 sql_type: "VARCHAR".into(),
428 nullable: true,
429 },
430 ],
431 )
432 }
433
434 fn empty_req() -> QueryRequest {
435 QueryRequest {
436 columns: vec![],
437 predicates: vec![],
438 group_by: vec![],
439 aggregations: vec![],
440 having: vec![],
441 distinct: false,
442 order_by: vec![],
443 limit: None,
444 page: 1,
445 page_size: 1000,
446 }
447 }
448
449 #[test]
452 fn agg_plan_none_when_no_group_by() {
453 let r = empty_req();
454 assert!(r.agg_plan(&schema()).unwrap().is_none());
455 }
456
457 #[test]
458 fn agg_plan_rejects_aggs_without_group_by() {
459 let mut r = empty_req();
460 r.aggregations = vec![Aggregation {
461 col: Some("score".into()),
462 op: "sum".into(),
463 alias: None,
464 }];
465 let err = r.agg_plan(&schema()).err().expect("expected error");
466 assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
467 }
468
469 #[test]
470 fn agg_plan_implicit_count_star() {
471 let mut r = empty_req();
472 r.group_by = vec!["name".into()];
473 let plan = r.agg_plan(&schema()).unwrap().unwrap();
474 assert_eq!(plan.group_cols, vec!["name"]);
475 assert_eq!(plan.aggs.len(), 1);
476 assert_eq!(plan.aggs[0].alias, "count");
477 assert!(plan.aggs[0].col.is_none());
478 assert!(matches!(plan.aggs[0].op, AggOp::Count));
479 }
480
481 #[test]
482 fn agg_plan_default_alias_format() {
483 let mut r = empty_req();
484 r.group_by = vec!["name".into()];
485 r.aggregations = vec![
486 Aggregation {
487 col: Some("score".into()),
488 op: "Sum".into(),
489 alias: None,
490 },
491 Aggregation {
492 col: Some("Mixed".into()),
493 op: "MAX".into(),
494 alias: Some("hi".into()),
495 },
496 ];
497 let plan = r.agg_plan(&schema()).unwrap().unwrap();
498 assert_eq!(plan.aggs[0].alias, "sum_score");
499 assert_eq!(plan.aggs[1].alias, "hi");
500 assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
502 }
503
504 #[test]
505 fn agg_plan_unknown_op() {
506 let mut r = empty_req();
507 r.group_by = vec!["name".into()];
508 r.aggregations = vec![Aggregation {
509 col: Some("score".into()),
510 op: "median".into(),
511 alias: None,
512 }];
513 let err = r.agg_plan(&schema()).err().expect("expected error");
514 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
515 }
516
517 #[test]
518 fn agg_plan_non_count_requires_col() {
519 let mut r = empty_req();
520 r.group_by = vec!["name".into()];
521 r.aggregations = vec![Aggregation {
522 col: None,
523 op: "avg".into(),
524 alias: None,
525 }];
526 let err = r.agg_plan(&schema()).err().expect("expected error");
527 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
528 }
529
530 #[test]
531 fn agg_plan_unknown_group_col() {
532 let mut r = empty_req();
533 r.group_by = vec!["nope".into()];
534 let err = r.agg_plan(&schema()).err().expect("expected error");
535 assert!(matches!(err, AppError::UnknownColumn(_)));
536 }
537
538 #[test]
539 fn agg_plan_distinct_conflicts_with_group_by() {
540 let mut r = empty_req();
541 r.distinct = true;
542 r.group_by = vec!["name".into()];
543 let err = r.agg_plan(&schema()).err().expect("expected error");
544 assert!(matches!(err, AppError::InvalidValue(_)));
545 }
546
547 #[test]
550 fn having_empty_returns_empty() {
551 let r = empty_req();
552 assert!(r.having_plan(None).unwrap().is_empty());
553 }
554
555 #[test]
556 fn having_requires_group_by() {
557 let mut r = empty_req();
558 r.having = vec![Predicate {
559 col: "count".into(),
560 op: "gt".into(),
561 val: Some(serde_json::json!(1)),
562 }];
563 let err = r.having_plan(None).err().expect("expected error");
564 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("group_by")));
565 }
566
567 #[test]
568 fn having_resolves_implicit_count_alias_to_expr() {
569 let mut r = empty_req();
570 r.group_by = vec!["name".into()];
571 r.having = vec![Predicate {
572 col: "count".into(),
573 op: "gt".into(),
574 val: Some(serde_json::json!(5)),
575 }];
576 let plan = r.agg_plan(&schema()).unwrap().unwrap();
577 let resolved = r.having_plan(Some(&plan)).unwrap();
578 assert_eq!(resolved.len(), 1);
579 assert_eq!(resolved[0].0, "COUNT(*)");
582 }
583
584 #[test]
585 fn having_resolves_named_alias_and_group_col() {
586 let mut r = empty_req();
587 r.group_by = vec!["name".into()];
588 r.aggregations = vec![Aggregation {
589 col: Some("score".into()),
590 op: "sum".into(),
591 alias: Some("total".into()),
592 }];
593 r.having = vec![
594 Predicate {
595 col: "total".into(),
596 op: "gte".into(),
597 val: Some(serde_json::json!(100)),
598 },
599 Predicate {
600 col: "name".into(),
601 op: "eq".into(),
602 val: Some(serde_json::json!("x")),
603 },
604 ];
605 let plan = r.agg_plan(&schema()).unwrap().unwrap();
606 let resolved = r.having_plan(Some(&plan)).unwrap();
607 assert_eq!(resolved.len(), 2);
608 assert_eq!(resolved[0].0, "SUM(\"score\")");
609 assert_eq!(resolved[1].0, "\"name\"");
611 }
612
613 #[test]
614 fn having_unknown_reference_errors() {
615 let mut r = empty_req();
616 r.group_by = vec!["name".into()];
617 r.having = vec![Predicate {
618 col: "nope".into(),
619 op: "gt".into(),
620 val: Some(serde_json::json!(1)),
621 }];
622 let plan = r.agg_plan(&schema()).unwrap().unwrap();
623 let err = r.having_plan(Some(&plan)).err().expect("expected error");
624 assert!(matches!(err, AppError::UnknownColumn(_)));
625 }
626
627 #[test]
630 fn order_by_none_when_empty() {
631 let r = empty_req();
632 assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
633 }
634
635 #[test]
636 fn order_by_default_asc_and_quoting() {
637 let mut r = empty_req();
638 r.order_by = vec![OrderBy {
639 col: "ID".into(),
640 dir: None,
641 }];
642 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
643 assert_eq!(sql, "\"id\" ASC");
645 }
646
647 #[test]
648 fn order_by_desc_case_insensitive() {
649 let mut r = empty_req();
650 r.order_by = vec![OrderBy {
651 col: "name".into(),
652 dir: Some("DESC".into()),
653 }];
654 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
655 assert_eq!(sql, "\"name\" DESC");
656 }
657
658 #[test]
659 fn order_by_bad_direction() {
660 let mut r = empty_req();
661 r.order_by = vec![OrderBy {
662 col: "id".into(),
663 dir: Some("backwards".into()),
664 }];
665 let err = r.order_by_sql(&schema(), None).unwrap_err();
666 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
667 }
668
669 #[test]
670 fn order_by_unknown_col_no_plan() {
671 let mut r = empty_req();
672 r.order_by = vec![OrderBy {
673 col: "missing".into(),
674 dir: None,
675 }];
676 let err = r.order_by_sql(&schema(), None).unwrap_err();
677 assert!(matches!(err, AppError::UnknownColumn(_)));
678 }
679
680 #[test]
681 fn order_by_with_plan_restricts_to_outputs() {
682 let mut r = empty_req();
683 r.group_by = vec!["name".into()];
684 r.aggregations = vec![Aggregation {
685 col: Some("score".into()),
686 op: "sum".into(),
687 alias: Some("total".into()),
688 }];
689 let plan = r.agg_plan(&schema()).unwrap().unwrap();
690
691 r.order_by = vec![
693 OrderBy {
694 col: "name".into(),
695 dir: Some("asc".into()),
696 },
697 OrderBy {
698 col: "TOTAL".into(),
699 dir: Some("desc".into()),
700 },
701 ];
702 let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
703 assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
704
705 r.order_by = vec![OrderBy {
707 col: "id".into(),
708 dir: None,
709 }];
710 let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
711 assert!(matches!(err, AppError::UnknownColumn(_)));
712 }
713
714 #[test]
717 fn limit_offset_first_page_default() {
718 let r = empty_req();
719 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
720 }
721
722 #[test]
723 fn limit_offset_pagination() {
724 let mut r = empty_req();
725 r.page = 3;
726 r.page_size = 50;
727 assert_eq!(r.effective_limit_offset(1000), (50, 100));
728 }
729
730 #[test]
731 fn limit_offset_caps_page_size_to_max() {
732 let mut r = empty_req();
733 r.page_size = 10_000;
734 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
735 }
736
737 #[test]
738 fn limit_offset_page_zero_treated_as_one() {
739 let mut r = empty_req();
740 r.page = 0;
741 r.page_size = 10;
742 assert_eq!(r.effective_limit_offset(1000), (10, 0));
743 }
744
745 #[test]
746 fn limit_offset_top_level_cap_truncates_last_page() {
747 let mut r = empty_req();
748 r.page = 2;
749 r.page_size = 50;
750 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (25, 50));
752 }
753
754 #[test]
755 fn limit_offset_top_level_cap_exhausted_returns_zero() {
756 let mut r = empty_req();
757 r.page = 3;
758 r.page_size = 50;
759 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (0, 100));
761 }
762}