1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9 pub col: String,
10 pub op: String,
12 pub val: Option<JsonValue>,
13}
14
15#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21 pub col: String,
22 #[serde(default)]
23 pub dir: Option<String>,
24}
25
26#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34 #[serde(default)]
35 pub col: Option<String>,
36 pub op: String,
37 #[serde(default)]
38 pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43 #[serde(default)]
46 pub columns: Vec<String>,
47 #[serde(default)]
48 pub predicates: Vec<Predicate>,
49 #[serde(default)]
52 pub group_by: Vec<String>,
53 #[serde(default)]
56 pub aggregations: Vec<Aggregation>,
57 #[serde(default)]
60 pub distinct: bool,
61 #[serde(default)]
63 pub order_by: Vec<OrderBy>,
64 #[serde(default)]
67 pub limit: Option<u64>,
68 #[serde(default = "default_page")]
69 pub page: u64,
70 #[serde(default = "default_page_size")]
71 pub page_size: u64,
72}
73
74#[derive(Clone, Deserialize)]
81pub struct SqlRequest {
82 pub sql: String,
85 #[serde(default)]
88 pub max_rows: Option<u64>,
89}
90
91#[derive(Clone)]
93pub struct AggSpec {
94 pub col: Option<String>,
96 pub op: AggOp,
97 pub alias: String,
99}
100
101#[derive(Clone, Copy)]
102pub enum AggOp {
103 Count,
104 Sum,
105 Avg,
106 Min,
107 Max,
108}
109impl AggOp {
110 pub fn as_sql(self) -> &'static str {
111 match self {
112 AggOp::Count => "COUNT",
113 AggOp::Sum => "SUM",
114 AggOp::Avg => "AVG",
115 AggOp::Min => "MIN",
116 AggOp::Max => "MAX",
117 }
118 }
119 pub fn name(self) -> &'static str {
120 match self {
121 AggOp::Count => "count",
122 AggOp::Sum => "sum",
123 AggOp::Avg => "avg",
124 AggOp::Min => "min",
125 AggOp::Max => "max",
126 }
127 }
128}
129
130impl AggSpec {
131 pub fn sql_expr(&self) -> Result<String, AppError> {
140 match (self.op, self.col.as_deref()) {
141 (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
142 (op, Some(c)) => Ok(format!(
143 "{}({})",
144 op.as_sql(),
145 DatasetSchema::quote_ident(c)
146 )),
147 (op, None) => Err(AppError::Internal(format!(
148 "aggregation '{}' resolved without a column (planner invariant violated)",
149 op.name()
150 ))),
151 }
152 }
153}
154
155#[derive(Clone)]
157pub struct AggPlan {
158 pub group_cols: Vec<String>,
159 pub aggs: Vec<AggSpec>,
160}
161
162impl AggPlan {
163 pub fn output_names(&self) -> Vec<String> {
167 let mut v = self.group_cols.clone();
168 v.extend(self.aggs.iter().map(|a| a.alias.clone()));
169 v
170 }
171}
172
173impl QueryRequest {
174 pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
181 if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
182 return Err(AppError::InvalidValue(
183 "distinct is mutually exclusive with group_by / aggregations".into(),
184 ));
185 }
186 if self.group_by.is_empty() {
187 if !self.aggregations.is_empty() {
188 return Err(AppError::InvalidValue(
189 "aggregations require a non-empty group_by".into(),
190 ));
191 }
192 return Ok(None);
193 }
194
195 let mut group_cols = Vec::with_capacity(self.group_by.len());
196 for name in &self.group_by {
197 group_cols.push(schema.find(name)?.name.clone());
198 }
199
200 let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
201 vec![Aggregation {
202 col: None,
203 op: "count".into(),
204 alias: None,
205 }]
206 } else {
207 self.aggregations.clone()
208 };
209
210 let mut aggs = Vec::with_capacity(raw_aggs.len());
211 for a in &raw_aggs {
212 let op = match a.op.to_ascii_lowercase().as_str() {
213 "count" => AggOp::Count,
214 "sum" => AggOp::Sum,
215 "avg" => AggOp::Avg,
216 "min" => AggOp::Min,
217 "max" => AggOp::Max,
218 other => {
219 return Err(AppError::InvalidValue(format!(
220 "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
221 )));
222 }
223 };
224 let col = match (op, a.col.as_deref()) {
225 (AggOp::Count, None) => None,
226 (_, None) => {
227 return Err(AppError::InvalidValue(format!(
228 "aggregation '{}' requires a 'col'",
229 op.name()
230 )));
231 }
232 (_, Some(c)) => Some(schema.find(c)?.name.clone()),
233 };
234 let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
235 Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
236 None => "count".into(),
237 });
238 aggs.push(AggSpec { col, op, alias });
239 }
240
241 Ok(Some(AggPlan { group_cols, aggs }))
242 }
243
244 pub fn order_by_sql(
253 &self,
254 schema: &DatasetSchema,
255 plan: Option<&AggPlan>,
256 ) -> Result<Option<String>, AppError> {
257 if self.order_by.is_empty() {
258 return Ok(None);
259 }
260 let parts: Vec<String> = self
261 .order_by
262 .iter()
263 .map(|o| {
264 let dir = match o
265 .dir
266 .as_deref()
267 .unwrap_or("asc")
268 .to_ascii_lowercase()
269 .as_str()
270 {
271 "asc" => "ASC",
272 "desc" => "DESC",
273 other => {
274 return Err(AppError::InvalidValue(format!(
275 "order_by direction must be 'asc' or 'desc' (got '{other}')"
276 )));
277 }
278 };
279 let ident = match plan {
280 Some(p) => {
281 let lc = o.col.to_lowercase();
282 let allowed = p.output_names();
283 allowed
284 .iter()
285 .find(|n| n.to_lowercase() == lc)
286 .map(|n| DatasetSchema::quote_ident(n))
287 .ok_or_else(|| {
288 AppError::UnknownColumn(format!(
289 "{} (must be a group_by column or aggregation alias)",
290 o.col
291 ))
292 })?
293 }
294 None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
295 };
296 Ok(format!("{ident} {dir}"))
297 })
298 .collect::<Result<_, _>>()?;
299 Ok(Some(parts.join(", ")))
300 }
301
302 pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
310 let page = self.page.max(1);
311 let page_size = self.page_size.clamp(1, page_size_cap);
312 let offset = (page - 1) * page_size;
313 let limit = match self.limit {
314 Some(cap) => {
315 if offset >= cap {
316 0
317 } else {
318 page_size.min(cap - offset)
319 }
320 }
321 None => page_size,
322 };
323 (limit, offset)
324 }
325}
326
327fn default_page() -> u64 {
328 1
329}
330fn default_page_size() -> u64 {
331 1000
332}
333
334#[derive(Clone, Deserialize, Default)]
337pub struct CountRequest {
338 #[serde(default)]
339 pub predicates: Vec<Predicate>,
340}
341
342#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
350
351 fn schema() -> DatasetSchema {
352 DatasetSchema::new(
353 "t",
354 vec![
355 ColumnInfo {
356 name: "id".into(),
357 logical: LogicalType::Int,
358 sql_type: "BIGINT".into(),
359 nullable: false,
360 },
361 ColumnInfo {
362 name: "name".into(),
363 logical: LogicalType::Utf8,
364 sql_type: "VARCHAR".into(),
365 nullable: true,
366 },
367 ColumnInfo {
368 name: "score".into(),
369 logical: LogicalType::Float,
370 sql_type: "DOUBLE".into(),
371 nullable: true,
372 },
373 ColumnInfo {
374 name: "Mixed".into(),
375 logical: LogicalType::Utf8,
376 sql_type: "VARCHAR".into(),
377 nullable: true,
378 },
379 ],
380 )
381 }
382
383 fn empty_req() -> QueryRequest {
384 QueryRequest {
385 columns: vec![],
386 predicates: vec![],
387 group_by: vec![],
388 aggregations: vec![],
389 distinct: false,
390 order_by: vec![],
391 limit: None,
392 page: 1,
393 page_size: 1000,
394 }
395 }
396
397 #[test]
400 fn agg_plan_none_when_no_group_by() {
401 let r = empty_req();
402 assert!(r.agg_plan(&schema()).unwrap().is_none());
403 }
404
405 #[test]
406 fn agg_plan_rejects_aggs_without_group_by() {
407 let mut r = empty_req();
408 r.aggregations = vec![Aggregation {
409 col: Some("score".into()),
410 op: "sum".into(),
411 alias: None,
412 }];
413 let err = r.agg_plan(&schema()).err().expect("expected error");
414 assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
415 }
416
417 #[test]
418 fn agg_plan_implicit_count_star() {
419 let mut r = empty_req();
420 r.group_by = vec!["name".into()];
421 let plan = r.agg_plan(&schema()).unwrap().unwrap();
422 assert_eq!(plan.group_cols, vec!["name"]);
423 assert_eq!(plan.aggs.len(), 1);
424 assert_eq!(plan.aggs[0].alias, "count");
425 assert!(plan.aggs[0].col.is_none());
426 assert!(matches!(plan.aggs[0].op, AggOp::Count));
427 }
428
429 #[test]
430 fn agg_plan_default_alias_format() {
431 let mut r = empty_req();
432 r.group_by = vec!["name".into()];
433 r.aggregations = vec![
434 Aggregation {
435 col: Some("score".into()),
436 op: "Sum".into(),
437 alias: None,
438 },
439 Aggregation {
440 col: Some("Mixed".into()),
441 op: "MAX".into(),
442 alias: Some("hi".into()),
443 },
444 ];
445 let plan = r.agg_plan(&schema()).unwrap().unwrap();
446 assert_eq!(plan.aggs[0].alias, "sum_score");
447 assert_eq!(plan.aggs[1].alias, "hi");
448 assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
450 }
451
452 #[test]
453 fn agg_plan_unknown_op() {
454 let mut r = empty_req();
455 r.group_by = vec!["name".into()];
456 r.aggregations = vec![Aggregation {
457 col: Some("score".into()),
458 op: "median".into(),
459 alias: None,
460 }];
461 let err = r.agg_plan(&schema()).err().expect("expected error");
462 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
463 }
464
465 #[test]
466 fn agg_plan_non_count_requires_col() {
467 let mut r = empty_req();
468 r.group_by = vec!["name".into()];
469 r.aggregations = vec![Aggregation {
470 col: None,
471 op: "avg".into(),
472 alias: None,
473 }];
474 let err = r.agg_plan(&schema()).err().expect("expected error");
475 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
476 }
477
478 #[test]
479 fn agg_plan_unknown_group_col() {
480 let mut r = empty_req();
481 r.group_by = vec!["nope".into()];
482 let err = r.agg_plan(&schema()).err().expect("expected error");
483 assert!(matches!(err, AppError::UnknownColumn(_)));
484 }
485
486 #[test]
487 fn agg_plan_distinct_conflicts_with_group_by() {
488 let mut r = empty_req();
489 r.distinct = true;
490 r.group_by = vec!["name".into()];
491 let err = r.agg_plan(&schema()).err().expect("expected error");
492 assert!(matches!(err, AppError::InvalidValue(_)));
493 }
494
495 #[test]
498 fn order_by_none_when_empty() {
499 let r = empty_req();
500 assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
501 }
502
503 #[test]
504 fn order_by_default_asc_and_quoting() {
505 let mut r = empty_req();
506 r.order_by = vec![OrderBy {
507 col: "ID".into(),
508 dir: None,
509 }];
510 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
511 assert_eq!(sql, "\"id\" ASC");
513 }
514
515 #[test]
516 fn order_by_desc_case_insensitive() {
517 let mut r = empty_req();
518 r.order_by = vec![OrderBy {
519 col: "name".into(),
520 dir: Some("DESC".into()),
521 }];
522 let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
523 assert_eq!(sql, "\"name\" DESC");
524 }
525
526 #[test]
527 fn order_by_bad_direction() {
528 let mut r = empty_req();
529 r.order_by = vec![OrderBy {
530 col: "id".into(),
531 dir: Some("backwards".into()),
532 }];
533 let err = r.order_by_sql(&schema(), None).unwrap_err();
534 assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
535 }
536
537 #[test]
538 fn order_by_unknown_col_no_plan() {
539 let mut r = empty_req();
540 r.order_by = vec![OrderBy {
541 col: "missing".into(),
542 dir: None,
543 }];
544 let err = r.order_by_sql(&schema(), None).unwrap_err();
545 assert!(matches!(err, AppError::UnknownColumn(_)));
546 }
547
548 #[test]
549 fn order_by_with_plan_restricts_to_outputs() {
550 let mut r = empty_req();
551 r.group_by = vec!["name".into()];
552 r.aggregations = vec![Aggregation {
553 col: Some("score".into()),
554 op: "sum".into(),
555 alias: Some("total".into()),
556 }];
557 let plan = r.agg_plan(&schema()).unwrap().unwrap();
558
559 r.order_by = vec![
561 OrderBy {
562 col: "name".into(),
563 dir: Some("asc".into()),
564 },
565 OrderBy {
566 col: "TOTAL".into(),
567 dir: Some("desc".into()),
568 },
569 ];
570 let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
571 assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
572
573 r.order_by = vec![OrderBy {
575 col: "id".into(),
576 dir: None,
577 }];
578 let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
579 assert!(matches!(err, AppError::UnknownColumn(_)));
580 }
581
582 #[test]
585 fn limit_offset_first_page_default() {
586 let r = empty_req();
587 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
588 }
589
590 #[test]
591 fn limit_offset_pagination() {
592 let mut r = empty_req();
593 r.page = 3;
594 r.page_size = 50;
595 assert_eq!(r.effective_limit_offset(1000), (50, 100));
596 }
597
598 #[test]
599 fn limit_offset_caps_page_size_to_max() {
600 let mut r = empty_req();
601 r.page_size = 10_000;
602 assert_eq!(r.effective_limit_offset(1000), (1000, 0));
603 }
604
605 #[test]
606 fn limit_offset_page_zero_treated_as_one() {
607 let mut r = empty_req();
608 r.page = 0;
609 r.page_size = 10;
610 assert_eq!(r.effective_limit_offset(1000), (10, 0));
611 }
612
613 #[test]
614 fn limit_offset_top_level_cap_truncates_last_page() {
615 let mut r = empty_req();
616 r.page = 2;
617 r.page_size = 50;
618 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (25, 50));
620 }
621
622 #[test]
623 fn limit_offset_top_level_cap_exhausted_returns_zero() {
624 let mut r = empty_req();
625 r.page = 3;
626 r.page_size = 50;
627 r.limit = Some(75); assert_eq!(r.effective_limit_offset(1000), (0, 100));
629 }
630}