Skip to main content

datapress_core/
models.rs

1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9    pub col: String,
10    /// eq | neq | gt | gte | lt | lte | like | ilike | in | is_null | is_not_null
11    pub op: String,
12    pub val: Option<JsonValue>,
13}
14
15/// A single `ORDER BY` clause entry.
16///
17/// `dir` is case-insensitive; accepted values are `"asc"` (default) and
18/// `"desc"`. Omitted = ascending.
19#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21    pub col: String,
22    #[serde(default)]
23    pub dir: Option<String>,
24}
25
26/// A single aggregation in a `group_by` query.
27///
28/// `op` is one of `count | sum | avg | min | max` (case-insensitive).
29/// `col` is required for every op except `count`, where it may be omitted
30/// to mean `COUNT(*)`. `alias` is the JSON output key; if omitted, it
31/// defaults to `count` for `COUNT(*)` and `{op}_{col}` otherwise.
32#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34    #[serde(default)]
35    pub col: Option<String>,
36    pub op: String,
37    #[serde(default)]
38    pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43    /// Columns to return. Empty = all columns. Ignored when `group_by` is
44    /// non-empty (the SELECT list is then derived from `group_by` + `aggregations`).
45    #[serde(default)]
46    pub columns: Vec<String>,
47    #[serde(default)]
48    pub predicates: Vec<Predicate>,
49    /// Group-by columns. Empty = no grouping (regular row scan). When set,
50    /// the response shape is `{ group_col_1, …, alias_1, … }` per row.
51    #[serde(default)]
52    pub group_by: Vec<String>,
53    /// Aggregations to compute over each group. When `group_by` is set and
54    /// this is empty, an implicit `{ op: "count" }` is added.
55    #[serde(default)]
56    pub aggregations: Vec<Aggregation>,
57    /// Return only distinct rows over the projected columns. Mutually
58    /// exclusive with `group_by` / `aggregations`.
59    #[serde(default)]
60    pub distinct: bool,
61    /// Sort spec. Empty = unsorted (engine order).
62    #[serde(default)]
63    pub order_by: Vec<OrderBy>,
64    /// Hard cap on total rows returned across all pages. `None` = no cap
65    /// beyond `page_size`.
66    #[serde(default)]
67    pub limit: Option<u64>,
68    #[serde(default = "default_page")]
69    pub page: u64,
70    #[serde(default = "default_page_size")]
71    pub page_size: u64,
72}
73
74/// One resolved aggregation, ready for SQL emission.
75#[derive(Clone)]
76pub struct AggSpec {
77    /// Canonical column name from the schema, or `None` for `COUNT(*)`.
78    pub col: Option<String>,
79    pub op: AggOp,
80    /// Output alias (JSON key). Always set after planning.
81    pub alias: String,
82}
83
84#[derive(Clone, Copy)]
85pub enum AggOp {
86    Count,
87    Sum,
88    Avg,
89    Min,
90    Max,
91}
92impl AggOp {
93    pub fn as_sql(self) -> &'static str {
94        match self {
95            AggOp::Count => "COUNT",
96            AggOp::Sum => "SUM",
97            AggOp::Avg => "AVG",
98            AggOp::Min => "MIN",
99            AggOp::Max => "MAX",
100        }
101    }
102    pub fn name(self) -> &'static str {
103        match self {
104            AggOp::Count => "count",
105            AggOp::Sum => "sum",
106            AggOp::Avg => "avg",
107            AggOp::Min => "min",
108            AggOp::Max => "max",
109        }
110    }
111}
112
113impl AggSpec {
114    /// Render the SQL aggregate expression for this spec, e.g. `COUNT(*)`
115    /// or `SUM("amount")`. The column name is quoted via
116    /// [`DatasetSchema::quote_ident`].
117    ///
118    /// By construction (see [`QueryRequest::agg_plan`]) every non-`COUNT`
119    /// op carries a resolved column and `COUNT` may omit one. If that
120    /// invariant is ever violated this returns `AppError::Internal`
121    /// rather than panicking, since the value flows onto a live HTTP path.
122    pub fn sql_expr(&self) -> Result<String, AppError> {
123        match (self.op, self.col.as_deref()) {
124            (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
125            (op, Some(c)) => Ok(format!(
126                "{}({})",
127                op.as_sql(),
128                DatasetSchema::quote_ident(c)
129            )),
130            (op, None) => Err(AppError::Internal(format!(
131                "aggregation '{}' resolved without a column (planner invariant violated)",
132                op.name()
133            ))),
134        }
135    }
136}
137
138/// Validated `GROUP BY` plan: canonical group columns + resolved aggregations.
139#[derive(Clone)]
140pub struct AggPlan {
141    pub group_cols: Vec<String>,
142    pub aggs: Vec<AggSpec>,
143}
144
145impl AggPlan {
146    /// All output names exposed by this plan, in SELECT order: group
147    /// columns first, then aggregation aliases. Used by `order_by`
148    /// validation when grouping is active.
149    pub fn output_names(&self) -> Vec<String> {
150        let mut v = self.group_cols.clone();
151        v.extend(self.aggs.iter().map(|a| a.alias.clone()));
152        v
153    }
154}
155
156impl QueryRequest {
157    /// Resolve the `group_by` + `aggregations` request into a validated
158    /// plan, or return `Ok(None)` when no grouping was requested.
159    ///
160    /// When `group_by` is non-empty and `aggregations` is empty, an
161    /// implicit `COUNT(*) AS count` is added so the plan always has at
162    /// least one output value.
163    pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
164        if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
165            return Err(AppError::InvalidValue(
166                "distinct is mutually exclusive with group_by / aggregations".into(),
167            ));
168        }
169        if self.group_by.is_empty() {
170            if !self.aggregations.is_empty() {
171                return Err(AppError::InvalidValue(
172                    "aggregations require a non-empty group_by".into(),
173                ));
174            }
175            return Ok(None);
176        }
177
178        let mut group_cols = Vec::with_capacity(self.group_by.len());
179        for name in &self.group_by {
180            group_cols.push(schema.find(name)?.name.clone());
181        }
182
183        let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
184            vec![Aggregation {
185                col: None,
186                op: "count".into(),
187                alias: None,
188            }]
189        } else {
190            self.aggregations.clone()
191        };
192
193        let mut aggs = Vec::with_capacity(raw_aggs.len());
194        for a in &raw_aggs {
195            let op = match a.op.to_ascii_lowercase().as_str() {
196                "count" => AggOp::Count,
197                "sum" => AggOp::Sum,
198                "avg" => AggOp::Avg,
199                "min" => AggOp::Min,
200                "max" => AggOp::Max,
201                other => {
202                    return Err(AppError::InvalidValue(format!(
203                        "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
204                    )));
205                }
206            };
207            let col = match (op, a.col.as_deref()) {
208                (AggOp::Count, None) => None,
209                (_, None) => {
210                    return Err(AppError::InvalidValue(format!(
211                        "aggregation '{}' requires a 'col'",
212                        op.name()
213                    )));
214                }
215                (_, Some(c)) => Some(schema.find(c)?.name.clone()),
216            };
217            let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
218                Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
219                None => "count".into(),
220            });
221            aggs.push(AggSpec { col, op, alias });
222        }
223
224        Ok(Some(AggPlan { group_cols, aggs }))
225    }
226
227    /// Translate `order_by` into a validated SQL fragment, e.g.
228    /// `"\"a\" ASC, \"b\" DESC"`. Returns `Ok(None)` if no ordering was
229    /// requested.
230    ///
231    /// When `plan` is `Some`, sort keys must reference a group-by column
232    /// or an aggregation alias (the only names in scope after `GROUP BY`).
233    /// When `plan` is `None`, sort keys are validated against the dataset
234    /// schema.
235    pub fn order_by_sql(
236        &self,
237        schema: &DatasetSchema,
238        plan: Option<&AggPlan>,
239    ) -> Result<Option<String>, AppError> {
240        if self.order_by.is_empty() {
241            return Ok(None);
242        }
243        let parts: Vec<String> = self
244            .order_by
245            .iter()
246            .map(|o| {
247                let dir = match o
248                    .dir
249                    .as_deref()
250                    .unwrap_or("asc")
251                    .to_ascii_lowercase()
252                    .as_str()
253                {
254                    "asc" => "ASC",
255                    "desc" => "DESC",
256                    other => {
257                        return Err(AppError::InvalidValue(format!(
258                            "order_by direction must be 'asc' or 'desc' (got '{other}')"
259                        )));
260                    }
261                };
262                let ident = match plan {
263                    Some(p) => {
264                        let lc = o.col.to_lowercase();
265                        let allowed = p.output_names();
266                        allowed
267                            .iter()
268                            .find(|n| n.to_lowercase() == lc)
269                            .map(|n| DatasetSchema::quote_ident(n))
270                            .ok_or_else(|| {
271                                AppError::UnknownColumn(format!(
272                                    "{} (must be a group_by column or aggregation alias)",
273                                    o.col
274                                ))
275                            })?
276                    }
277                    None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
278                };
279                Ok(format!("{ident} {dir}"))
280            })
281            .collect::<Result<_, _>>()?;
282        Ok(Some(parts.join(", ")))
283    }
284
285    /// Compute the effective SQL `LIMIT` and `OFFSET` for this request,
286    /// honouring both `page`/`page_size` and the optional top-level `limit`
287    /// cap. `page_size_cap` is the per-page maximum the backend enforces.
288    ///
289    /// Semantics: pagination still drives offset; `limit` caps the total
290    /// number of rows ever returned across all pages. Once `offset >=
291    /// limit`, the effective LIMIT is `0` (empty page).
292    pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
293        let page = self.page.max(1);
294        let page_size = self.page_size.clamp(1, page_size_cap);
295        let offset = (page - 1) * page_size;
296        let limit = match self.limit {
297            Some(cap) => {
298                if offset >= cap {
299                    0
300                } else {
301                    page_size.min(cap - offset)
302                }
303            }
304            None => page_size,
305        };
306        (limit, offset)
307    }
308}
309
310fn default_page() -> u64 {
311    1
312}
313fn default_page_size() -> u64 {
314    1000
315}
316
317/// Body for `POST /api/datasets/{name}/count`. Predicates are optional —
318/// an empty body (or `{}`) counts every row in the dataset.
319#[derive(Clone, Deserialize, Default)]
320pub struct CountRequest {
321    #[serde(default)]
322    pub predicates: Vec<Predicate>,
323}
324
325// ---------------------------------------------------------------------------
326// Tests
327// ---------------------------------------------------------------------------
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
333
334    fn schema() -> DatasetSchema {
335        DatasetSchema::new(
336            "t",
337            vec![
338                ColumnInfo {
339                    name: "id".into(),
340                    logical: LogicalType::Int,
341                    sql_type: "BIGINT".into(),
342                    nullable: false,
343                },
344                ColumnInfo {
345                    name: "name".into(),
346                    logical: LogicalType::Utf8,
347                    sql_type: "VARCHAR".into(),
348                    nullable: true,
349                },
350                ColumnInfo {
351                    name: "score".into(),
352                    logical: LogicalType::Float,
353                    sql_type: "DOUBLE".into(),
354                    nullable: true,
355                },
356                ColumnInfo {
357                    name: "Mixed".into(),
358                    logical: LogicalType::Utf8,
359                    sql_type: "VARCHAR".into(),
360                    nullable: true,
361                },
362            ],
363        )
364    }
365
366    fn empty_req() -> QueryRequest {
367        QueryRequest {
368            columns: vec![],
369            predicates: vec![],
370            group_by: vec![],
371            aggregations: vec![],
372            distinct: false,
373            order_by: vec![],
374            limit: None,
375            page: 1,
376            page_size: 1000,
377        }
378    }
379
380    // ---- agg_plan -----------------------------------------------------------
381
382    #[test]
383    fn agg_plan_none_when_no_group_by() {
384        let r = empty_req();
385        assert!(r.agg_plan(&schema()).unwrap().is_none());
386    }
387
388    #[test]
389    fn agg_plan_rejects_aggs_without_group_by() {
390        let mut r = empty_req();
391        r.aggregations = vec![Aggregation {
392            col: Some("score".into()),
393            op: "sum".into(),
394            alias: None,
395        }];
396        let err = r.agg_plan(&schema()).err().expect("expected error");
397        assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
398    }
399
400    #[test]
401    fn agg_plan_implicit_count_star() {
402        let mut r = empty_req();
403        r.group_by = vec!["name".into()];
404        let plan = r.agg_plan(&schema()).unwrap().unwrap();
405        assert_eq!(plan.group_cols, vec!["name"]);
406        assert_eq!(plan.aggs.len(), 1);
407        assert_eq!(plan.aggs[0].alias, "count");
408        assert!(plan.aggs[0].col.is_none());
409        assert!(matches!(plan.aggs[0].op, AggOp::Count));
410    }
411
412    #[test]
413    fn agg_plan_default_alias_format() {
414        let mut r = empty_req();
415        r.group_by = vec!["name".into()];
416        r.aggregations = vec![
417            Aggregation {
418                col: Some("score".into()),
419                op: "Sum".into(),
420                alias: None,
421            },
422            Aggregation {
423                col: Some("Mixed".into()),
424                op: "MAX".into(),
425                alias: Some("hi".into()),
426            },
427        ];
428        let plan = r.agg_plan(&schema()).unwrap().unwrap();
429        assert_eq!(plan.aggs[0].alias, "sum_score");
430        assert_eq!(plan.aggs[1].alias, "hi");
431        // Canonical column name is preserved from the schema (case fix).
432        assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
433    }
434
435    #[test]
436    fn agg_plan_unknown_op() {
437        let mut r = empty_req();
438        r.group_by = vec!["name".into()];
439        r.aggregations = vec![Aggregation {
440            col: Some("score".into()),
441            op: "median".into(),
442            alias: None,
443        }];
444        let err = r.agg_plan(&schema()).err().expect("expected error");
445        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
446    }
447
448    #[test]
449    fn agg_plan_non_count_requires_col() {
450        let mut r = empty_req();
451        r.group_by = vec!["name".into()];
452        r.aggregations = vec![Aggregation {
453            col: None,
454            op: "avg".into(),
455            alias: None,
456        }];
457        let err = r.agg_plan(&schema()).err().expect("expected error");
458        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
459    }
460
461    #[test]
462    fn agg_plan_unknown_group_col() {
463        let mut r = empty_req();
464        r.group_by = vec!["nope".into()];
465        let err = r.agg_plan(&schema()).err().expect("expected error");
466        assert!(matches!(err, AppError::UnknownColumn(_)));
467    }
468
469    #[test]
470    fn agg_plan_distinct_conflicts_with_group_by() {
471        let mut r = empty_req();
472        r.distinct = true;
473        r.group_by = vec!["name".into()];
474        let err = r.agg_plan(&schema()).err().expect("expected error");
475        assert!(matches!(err, AppError::InvalidValue(_)));
476    }
477
478    // ---- order_by_sql -------------------------------------------------------
479
480    #[test]
481    fn order_by_none_when_empty() {
482        let r = empty_req();
483        assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
484    }
485
486    #[test]
487    fn order_by_default_asc_and_quoting() {
488        let mut r = empty_req();
489        r.order_by = vec![OrderBy {
490            col: "ID".into(),
491            dir: None,
492        }];
493        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
494        // Canonical name from schema preserved + quoted.
495        assert_eq!(sql, "\"id\" ASC");
496    }
497
498    #[test]
499    fn order_by_desc_case_insensitive() {
500        let mut r = empty_req();
501        r.order_by = vec![OrderBy {
502            col: "name".into(),
503            dir: Some("DESC".into()),
504        }];
505        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
506        assert_eq!(sql, "\"name\" DESC");
507    }
508
509    #[test]
510    fn order_by_bad_direction() {
511        let mut r = empty_req();
512        r.order_by = vec![OrderBy {
513            col: "id".into(),
514            dir: Some("backwards".into()),
515        }];
516        let err = r.order_by_sql(&schema(), None).unwrap_err();
517        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
518    }
519
520    #[test]
521    fn order_by_unknown_col_no_plan() {
522        let mut r = empty_req();
523        r.order_by = vec![OrderBy {
524            col: "missing".into(),
525            dir: None,
526        }];
527        let err = r.order_by_sql(&schema(), None).unwrap_err();
528        assert!(matches!(err, AppError::UnknownColumn(_)));
529    }
530
531    #[test]
532    fn order_by_with_plan_restricts_to_outputs() {
533        let mut r = empty_req();
534        r.group_by = vec!["name".into()];
535        r.aggregations = vec![Aggregation {
536            col: Some("score".into()),
537            op: "sum".into(),
538            alias: Some("total".into()),
539        }];
540        let plan = r.agg_plan(&schema()).unwrap().unwrap();
541
542        // Allowed: group col + alias.
543        r.order_by = vec![
544            OrderBy {
545                col: "name".into(),
546                dir: Some("asc".into()),
547            },
548            OrderBy {
549                col: "TOTAL".into(),
550                dir: Some("desc".into()),
551            },
552        ];
553        let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
554        assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
555
556        // Not allowed: raw schema column that isn't in the group/agg output.
557        r.order_by = vec![OrderBy {
558            col: "id".into(),
559            dir: None,
560        }];
561        let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
562        assert!(matches!(err, AppError::UnknownColumn(_)));
563    }
564
565    // ---- effective_limit_offset --------------------------------------------
566
567    #[test]
568    fn limit_offset_first_page_default() {
569        let r = empty_req();
570        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
571    }
572
573    #[test]
574    fn limit_offset_pagination() {
575        let mut r = empty_req();
576        r.page = 3;
577        r.page_size = 50;
578        assert_eq!(r.effective_limit_offset(1000), (50, 100));
579    }
580
581    #[test]
582    fn limit_offset_caps_page_size_to_max() {
583        let mut r = empty_req();
584        r.page_size = 10_000;
585        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
586    }
587
588    #[test]
589    fn limit_offset_page_zero_treated_as_one() {
590        let mut r = empty_req();
591        r.page = 0;
592        r.page_size = 10;
593        assert_eq!(r.effective_limit_offset(1000), (10, 0));
594    }
595
596    #[test]
597    fn limit_offset_top_level_cap_truncates_last_page() {
598        let mut r = empty_req();
599        r.page = 2;
600        r.page_size = 50;
601        r.limit = Some(75); // offset 50, only 25 rows remain under cap.
602        assert_eq!(r.effective_limit_offset(1000), (25, 50));
603    }
604
605    #[test]
606    fn limit_offset_top_level_cap_exhausted_returns_zero() {
607        let mut r = empty_req();
608        r.page = 3;
609        r.page_size = 50;
610        r.limit = Some(75); // offset 100 >= 75 -> empty page.
611        assert_eq!(r.effective_limit_offset(1000), (0, 100));
612    }
613}