Skip to main content

datapress_core/
models.rs

1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9    pub col: String,
10    /// eq | neq | gt | gte | lt | lte | like | ilike | in | is_null | is_not_null
11    pub op: String,
12    pub val: Option<JsonValue>,
13}
14
15/// A single `ORDER BY` clause entry.
16///
17/// `dir` is case-insensitive; accepted values are `"asc"` (default) and
18/// `"desc"`. Omitted = ascending.
19#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21    pub col: String,
22    #[serde(default)]
23    pub dir: Option<String>,
24}
25
26/// A single aggregation in a `group_by` query.
27///
28/// `op` is one of `count | sum | avg | min | max` (case-insensitive).
29/// `col` is required for every op except `count`, where it may be omitted
30/// to mean `COUNT(*)`. `alias` is the JSON output key; if omitted, it
31/// defaults to `count` for `COUNT(*)` and `{op}_{col}` otherwise.
32#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34    #[serde(default)]
35    pub col: Option<String>,
36    pub op: String,
37    #[serde(default)]
38    pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43    /// Columns to return. Empty = all columns. Ignored when `group_by` is
44    /// non-empty (the SELECT list is then derived from `group_by` + `aggregations`).
45    #[serde(default)]
46    pub columns: Vec<String>,
47    #[serde(default)]
48    pub predicates: Vec<Predicate>,
49    /// Group-by columns. Empty = no grouping (regular row scan). When set,
50    /// the response shape is `{ group_col_1, …, alias_1, … }` per row.
51    #[serde(default)]
52    pub group_by: Vec<String>,
53    /// Aggregations to compute over each group. When `group_by` is set and
54    /// this is empty, an implicit `{ op: "count" }` is added.
55    #[serde(default)]
56    pub aggregations: Vec<Aggregation>,
57    /// Return only distinct rows over the projected columns. Mutually
58    /// exclusive with `group_by` / `aggregations`.
59    #[serde(default)]
60    pub distinct: bool,
61    /// Sort spec. Empty = unsorted (engine order).
62    #[serde(default)]
63    pub order_by: Vec<OrderBy>,
64    /// Hard cap on total rows returned across all pages. `None` = no cap
65    /// beyond `page_size`.
66    #[serde(default)]
67    pub limit: Option<u64>,
68    #[serde(default = "default_page")]
69    pub page: u64,
70    #[serde(default = "default_page_size")]
71    pub page_size: u64,
72}
73
74/// Request body for the raw-SQL endpoint (`POST /api/v1/sql`).
75///
76/// `sql` is an arbitrary read-only `SELECT`; it is parsed and validated
77/// by [`crate::sql::validate`] before any engine sees it. `max_rows`
78/// lets a caller request *fewer* rows than the server-side cap
79/// (`[sql].max_rows`); it can never raise the cap.
80#[derive(Clone, Deserialize)]
81pub struct SqlRequest {
82    /// The SQL statement to execute. Must be a single read-only query
83    /// referencing a single registered dataset.
84    pub sql: String,
85    /// Optional client-side row cap. Clamped to the server-configured
86    /// `[sql].max_rows`; `None` uses the server cap.
87    #[serde(default)]
88    pub max_rows: Option<u64>,
89}
90
91/// One resolved aggregation, ready for SQL emission.
92#[derive(Clone)]
93pub struct AggSpec {
94    /// Canonical column name from the schema, or `None` for `COUNT(*)`.
95    pub col: Option<String>,
96    pub op: AggOp,
97    /// Output alias (JSON key). Always set after planning.
98    pub alias: String,
99}
100
101#[derive(Clone, Copy)]
102pub enum AggOp {
103    Count,
104    Sum,
105    Avg,
106    Min,
107    Max,
108}
109impl AggOp {
110    pub fn as_sql(self) -> &'static str {
111        match self {
112            AggOp::Count => "COUNT",
113            AggOp::Sum => "SUM",
114            AggOp::Avg => "AVG",
115            AggOp::Min => "MIN",
116            AggOp::Max => "MAX",
117        }
118    }
119    pub fn name(self) -> &'static str {
120        match self {
121            AggOp::Count => "count",
122            AggOp::Sum => "sum",
123            AggOp::Avg => "avg",
124            AggOp::Min => "min",
125            AggOp::Max => "max",
126        }
127    }
128}
129
130impl AggSpec {
131    /// Render the SQL aggregate expression for this spec, e.g. `COUNT(*)`
132    /// or `SUM("amount")`. The column name is quoted via
133    /// [`DatasetSchema::quote_ident`].
134    ///
135    /// By construction (see [`QueryRequest::agg_plan`]) every non-`COUNT`
136    /// op carries a resolved column and `COUNT` may omit one. If that
137    /// invariant is ever violated this returns `AppError::Internal`
138    /// rather than panicking, since the value flows onto a live HTTP path.
139    pub fn sql_expr(&self) -> Result<String, AppError> {
140        match (self.op, self.col.as_deref()) {
141            (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
142            (op, Some(c)) => Ok(format!(
143                "{}({})",
144                op.as_sql(),
145                DatasetSchema::quote_ident(c)
146            )),
147            (op, None) => Err(AppError::Internal(format!(
148                "aggregation '{}' resolved without a column (planner invariant violated)",
149                op.name()
150            ))),
151        }
152    }
153}
154
155/// Validated `GROUP BY` plan: canonical group columns + resolved aggregations.
156#[derive(Clone)]
157pub struct AggPlan {
158    pub group_cols: Vec<String>,
159    pub aggs: Vec<AggSpec>,
160}
161
162impl AggPlan {
163    /// All output names exposed by this plan, in SELECT order: group
164    /// columns first, then aggregation aliases. Used by `order_by`
165    /// validation when grouping is active.
166    pub fn output_names(&self) -> Vec<String> {
167        let mut v = self.group_cols.clone();
168        v.extend(self.aggs.iter().map(|a| a.alias.clone()));
169        v
170    }
171}
172
173impl QueryRequest {
174    /// Resolve the `group_by` + `aggregations` request into a validated
175    /// plan, or return `Ok(None)` when no grouping was requested.
176    ///
177    /// When `group_by` is non-empty and `aggregations` is empty, an
178    /// implicit `COUNT(*) AS count` is added so the plan always has at
179    /// least one output value.
180    pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
181        if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
182            return Err(AppError::InvalidValue(
183                "distinct is mutually exclusive with group_by / aggregations".into(),
184            ));
185        }
186        if self.group_by.is_empty() {
187            if !self.aggregations.is_empty() {
188                return Err(AppError::InvalidValue(
189                    "aggregations require a non-empty group_by".into(),
190                ));
191            }
192            return Ok(None);
193        }
194
195        let mut group_cols = Vec::with_capacity(self.group_by.len());
196        for name in &self.group_by {
197            group_cols.push(schema.find(name)?.name.clone());
198        }
199
200        let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
201            vec![Aggregation {
202                col: None,
203                op: "count".into(),
204                alias: None,
205            }]
206        } else {
207            self.aggregations.clone()
208        };
209
210        let mut aggs = Vec::with_capacity(raw_aggs.len());
211        for a in &raw_aggs {
212            let op = match a.op.to_ascii_lowercase().as_str() {
213                "count" => AggOp::Count,
214                "sum" => AggOp::Sum,
215                "avg" => AggOp::Avg,
216                "min" => AggOp::Min,
217                "max" => AggOp::Max,
218                other => {
219                    return Err(AppError::InvalidValue(format!(
220                        "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
221                    )));
222                }
223            };
224            let col = match (op, a.col.as_deref()) {
225                (AggOp::Count, None) => None,
226                (_, None) => {
227                    return Err(AppError::InvalidValue(format!(
228                        "aggregation '{}' requires a 'col'",
229                        op.name()
230                    )));
231                }
232                (_, Some(c)) => Some(schema.find(c)?.name.clone()),
233            };
234            let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
235                Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
236                None => "count".into(),
237            });
238            aggs.push(AggSpec { col, op, alias });
239        }
240
241        Ok(Some(AggPlan { group_cols, aggs }))
242    }
243
244    /// Translate `order_by` into a validated SQL fragment, e.g.
245    /// `"\"a\" ASC, \"b\" DESC"`. Returns `Ok(None)` if no ordering was
246    /// requested.
247    ///
248    /// When `plan` is `Some`, sort keys must reference a group-by column
249    /// or an aggregation alias (the only names in scope after `GROUP BY`).
250    /// When `plan` is `None`, sort keys are validated against the dataset
251    /// schema.
252    pub fn order_by_sql(
253        &self,
254        schema: &DatasetSchema,
255        plan: Option<&AggPlan>,
256    ) -> Result<Option<String>, AppError> {
257        if self.order_by.is_empty() {
258            return Ok(None);
259        }
260        let parts: Vec<String> = self
261            .order_by
262            .iter()
263            .map(|o| {
264                let dir = match o
265                    .dir
266                    .as_deref()
267                    .unwrap_or("asc")
268                    .to_ascii_lowercase()
269                    .as_str()
270                {
271                    "asc" => "ASC",
272                    "desc" => "DESC",
273                    other => {
274                        return Err(AppError::InvalidValue(format!(
275                            "order_by direction must be 'asc' or 'desc' (got '{other}')"
276                        )));
277                    }
278                };
279                let ident = match plan {
280                    Some(p) => {
281                        let lc = o.col.to_lowercase();
282                        let allowed = p.output_names();
283                        allowed
284                            .iter()
285                            .find(|n| n.to_lowercase() == lc)
286                            .map(|n| DatasetSchema::quote_ident(n))
287                            .ok_or_else(|| {
288                                AppError::UnknownColumn(format!(
289                                    "{} (must be a group_by column or aggregation alias)",
290                                    o.col
291                                ))
292                            })?
293                    }
294                    None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
295                };
296                Ok(format!("{ident} {dir}"))
297            })
298            .collect::<Result<_, _>>()?;
299        Ok(Some(parts.join(", ")))
300    }
301
302    /// Compute the effective SQL `LIMIT` and `OFFSET` for this request,
303    /// honouring both `page`/`page_size` and the optional top-level `limit`
304    /// cap. `page_size_cap` is the per-page maximum the backend enforces.
305    ///
306    /// Semantics: pagination still drives offset; `limit` caps the total
307    /// number of rows ever returned across all pages. Once `offset >=
308    /// limit`, the effective LIMIT is `0` (empty page).
309    pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
310        let page = self.page.max(1);
311        let page_size = self.page_size.clamp(1, page_size_cap);
312        let offset = (page - 1) * page_size;
313        let limit = match self.limit {
314            Some(cap) => {
315                if offset >= cap {
316                    0
317                } else {
318                    page_size.min(cap - offset)
319                }
320            }
321            None => page_size,
322        };
323        (limit, offset)
324    }
325}
326
327fn default_page() -> u64 {
328    1
329}
330fn default_page_size() -> u64 {
331    1000
332}
333
334/// Body for `POST /api/datasets/{name}/count`. Predicates are optional —
335/// an empty body (or `{}`) counts every row in the dataset.
336#[derive(Clone, Deserialize, Default)]
337pub struct CountRequest {
338    #[serde(default)]
339    pub predicates: Vec<Predicate>,
340}
341
342// ---------------------------------------------------------------------------
343// Tests
344// ---------------------------------------------------------------------------
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
350
351    fn schema() -> DatasetSchema {
352        DatasetSchema::new(
353            "t",
354            vec![
355                ColumnInfo {
356                    name: "id".into(),
357                    logical: LogicalType::Int,
358                    sql_type: "BIGINT".into(),
359                    nullable: false,
360                },
361                ColumnInfo {
362                    name: "name".into(),
363                    logical: LogicalType::Utf8,
364                    sql_type: "VARCHAR".into(),
365                    nullable: true,
366                },
367                ColumnInfo {
368                    name: "score".into(),
369                    logical: LogicalType::Float,
370                    sql_type: "DOUBLE".into(),
371                    nullable: true,
372                },
373                ColumnInfo {
374                    name: "Mixed".into(),
375                    logical: LogicalType::Utf8,
376                    sql_type: "VARCHAR".into(),
377                    nullable: true,
378                },
379            ],
380        )
381    }
382
383    fn empty_req() -> QueryRequest {
384        QueryRequest {
385            columns: vec![],
386            predicates: vec![],
387            group_by: vec![],
388            aggregations: vec![],
389            distinct: false,
390            order_by: vec![],
391            limit: None,
392            page: 1,
393            page_size: 1000,
394        }
395    }
396
397    // ---- agg_plan -----------------------------------------------------------
398
399    #[test]
400    fn agg_plan_none_when_no_group_by() {
401        let r = empty_req();
402        assert!(r.agg_plan(&schema()).unwrap().is_none());
403    }
404
405    #[test]
406    fn agg_plan_rejects_aggs_without_group_by() {
407        let mut r = empty_req();
408        r.aggregations = vec![Aggregation {
409            col: Some("score".into()),
410            op: "sum".into(),
411            alias: None,
412        }];
413        let err = r.agg_plan(&schema()).err().expect("expected error");
414        assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
415    }
416
417    #[test]
418    fn agg_plan_implicit_count_star() {
419        let mut r = empty_req();
420        r.group_by = vec!["name".into()];
421        let plan = r.agg_plan(&schema()).unwrap().unwrap();
422        assert_eq!(plan.group_cols, vec!["name"]);
423        assert_eq!(plan.aggs.len(), 1);
424        assert_eq!(plan.aggs[0].alias, "count");
425        assert!(plan.aggs[0].col.is_none());
426        assert!(matches!(plan.aggs[0].op, AggOp::Count));
427    }
428
429    #[test]
430    fn agg_plan_default_alias_format() {
431        let mut r = empty_req();
432        r.group_by = vec!["name".into()];
433        r.aggregations = vec![
434            Aggregation {
435                col: Some("score".into()),
436                op: "Sum".into(),
437                alias: None,
438            },
439            Aggregation {
440                col: Some("Mixed".into()),
441                op: "MAX".into(),
442                alias: Some("hi".into()),
443            },
444        ];
445        let plan = r.agg_plan(&schema()).unwrap().unwrap();
446        assert_eq!(plan.aggs[0].alias, "sum_score");
447        assert_eq!(plan.aggs[1].alias, "hi");
448        // Canonical column name is preserved from the schema (case fix).
449        assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
450    }
451
452    #[test]
453    fn agg_plan_unknown_op() {
454        let mut r = empty_req();
455        r.group_by = vec!["name".into()];
456        r.aggregations = vec![Aggregation {
457            col: Some("score".into()),
458            op: "median".into(),
459            alias: None,
460        }];
461        let err = r.agg_plan(&schema()).err().expect("expected error");
462        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
463    }
464
465    #[test]
466    fn agg_plan_non_count_requires_col() {
467        let mut r = empty_req();
468        r.group_by = vec!["name".into()];
469        r.aggregations = vec![Aggregation {
470            col: None,
471            op: "avg".into(),
472            alias: None,
473        }];
474        let err = r.agg_plan(&schema()).err().expect("expected error");
475        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
476    }
477
478    #[test]
479    fn agg_plan_unknown_group_col() {
480        let mut r = empty_req();
481        r.group_by = vec!["nope".into()];
482        let err = r.agg_plan(&schema()).err().expect("expected error");
483        assert!(matches!(err, AppError::UnknownColumn(_)));
484    }
485
486    #[test]
487    fn agg_plan_distinct_conflicts_with_group_by() {
488        let mut r = empty_req();
489        r.distinct = true;
490        r.group_by = vec!["name".into()];
491        let err = r.agg_plan(&schema()).err().expect("expected error");
492        assert!(matches!(err, AppError::InvalidValue(_)));
493    }
494
495    // ---- order_by_sql -------------------------------------------------------
496
497    #[test]
498    fn order_by_none_when_empty() {
499        let r = empty_req();
500        assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
501    }
502
503    #[test]
504    fn order_by_default_asc_and_quoting() {
505        let mut r = empty_req();
506        r.order_by = vec![OrderBy {
507            col: "ID".into(),
508            dir: None,
509        }];
510        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
511        // Canonical name from schema preserved + quoted.
512        assert_eq!(sql, "\"id\" ASC");
513    }
514
515    #[test]
516    fn order_by_desc_case_insensitive() {
517        let mut r = empty_req();
518        r.order_by = vec![OrderBy {
519            col: "name".into(),
520            dir: Some("DESC".into()),
521        }];
522        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
523        assert_eq!(sql, "\"name\" DESC");
524    }
525
526    #[test]
527    fn order_by_bad_direction() {
528        let mut r = empty_req();
529        r.order_by = vec![OrderBy {
530            col: "id".into(),
531            dir: Some("backwards".into()),
532        }];
533        let err = r.order_by_sql(&schema(), None).unwrap_err();
534        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
535    }
536
537    #[test]
538    fn order_by_unknown_col_no_plan() {
539        let mut r = empty_req();
540        r.order_by = vec![OrderBy {
541            col: "missing".into(),
542            dir: None,
543        }];
544        let err = r.order_by_sql(&schema(), None).unwrap_err();
545        assert!(matches!(err, AppError::UnknownColumn(_)));
546    }
547
548    #[test]
549    fn order_by_with_plan_restricts_to_outputs() {
550        let mut r = empty_req();
551        r.group_by = vec!["name".into()];
552        r.aggregations = vec![Aggregation {
553            col: Some("score".into()),
554            op: "sum".into(),
555            alias: Some("total".into()),
556        }];
557        let plan = r.agg_plan(&schema()).unwrap().unwrap();
558
559        // Allowed: group col + alias.
560        r.order_by = vec![
561            OrderBy {
562                col: "name".into(),
563                dir: Some("asc".into()),
564            },
565            OrderBy {
566                col: "TOTAL".into(),
567                dir: Some("desc".into()),
568            },
569        ];
570        let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
571        assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
572
573        // Not allowed: raw schema column that isn't in the group/agg output.
574        r.order_by = vec![OrderBy {
575            col: "id".into(),
576            dir: None,
577        }];
578        let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
579        assert!(matches!(err, AppError::UnknownColumn(_)));
580    }
581
582    // ---- effective_limit_offset --------------------------------------------
583
584    #[test]
585    fn limit_offset_first_page_default() {
586        let r = empty_req();
587        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
588    }
589
590    #[test]
591    fn limit_offset_pagination() {
592        let mut r = empty_req();
593        r.page = 3;
594        r.page_size = 50;
595        assert_eq!(r.effective_limit_offset(1000), (50, 100));
596    }
597
598    #[test]
599    fn limit_offset_caps_page_size_to_max() {
600        let mut r = empty_req();
601        r.page_size = 10_000;
602        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
603    }
604
605    #[test]
606    fn limit_offset_page_zero_treated_as_one() {
607        let mut r = empty_req();
608        r.page = 0;
609        r.page_size = 10;
610        assert_eq!(r.effective_limit_offset(1000), (10, 0));
611    }
612
613    #[test]
614    fn limit_offset_top_level_cap_truncates_last_page() {
615        let mut r = empty_req();
616        r.page = 2;
617        r.page_size = 50;
618        r.limit = Some(75); // offset 50, only 25 rows remain under cap.
619        assert_eq!(r.effective_limit_offset(1000), (25, 50));
620    }
621
622    #[test]
623    fn limit_offset_top_level_cap_exhausted_returns_zero() {
624        let mut r = empty_req();
625        r.page = 3;
626        r.page_size = 50;
627        r.limit = Some(75); // offset 100 >= 75 -> empty page.
628        assert_eq!(r.effective_limit_offset(1000), (0, 100));
629    }
630}