Skip to main content

datapress_core/
models.rs

1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9    pub col: String,
10    /// eq | neq | gt | gte | lt | lte | like | ilike | in | is_null | is_not_null
11    pub op: String,
12    pub val: Option<JsonValue>,
13}
14
15/// A single `ORDER BY` clause entry.
16///
17/// `dir` is case-insensitive; accepted values are `"asc"` (default) and
18/// `"desc"`. Omitted = ascending.
19#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21    pub col: String,
22    #[serde(default)]
23    pub dir: Option<String>,
24}
25
26/// A single aggregation in a `group_by` query.
27///
28/// `op` is one of `count | sum | avg | min | max` (case-insensitive).
29/// `col` is required for every op except `count`, where it may be omitted
30/// to mean `COUNT(*)`. `alias` is the JSON output key; if omitted, it
31/// defaults to `count` for `COUNT(*)` and `{op}_{col}` otherwise.
32#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34    #[serde(default)]
35    pub col: Option<String>,
36    pub op: String,
37    #[serde(default)]
38    pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43    /// Columns to return. Empty = all columns. Ignored when `group_by` is
44    /// non-empty (the SELECT list is then derived from `group_by` + `aggregations`).
45    #[serde(default)]
46    pub columns: Vec<String>,
47    #[serde(default)]
48    pub predicates: Vec<Predicate>,
49    /// Group-by columns. Empty = no grouping (regular row scan). When set,
50    /// the response shape is `{ group_col_1, …, alias_1, … }` per row.
51    #[serde(default)]
52    pub group_by: Vec<String>,
53    /// Aggregations to compute over each group. When `group_by` is set and
54    /// this is empty, an implicit `{ op: "count" }` is added.
55    #[serde(default)]
56    pub aggregations: Vec<Aggregation>,
57    /// Post-aggregation row filters, ANDed together (SQL `HAVING`). Each
58    /// predicate's `col` references a `group_by` column or an aggregation
59    /// alias; requires a non-empty `group_by`. Same op vocabulary as
60    /// `predicates` (`eq | neq | gt | gte | lt | lte | like | ilike | in |
61    /// is_null | is_not_null`).
62    #[serde(default)]
63    pub having: Vec<Predicate>,
64    /// Return only distinct rows over the projected columns. Mutually
65    /// exclusive with `group_by` / `aggregations`.
66    #[serde(default)]
67    pub distinct: bool,
68    /// Sort spec. Empty = unsorted (engine order).
69    #[serde(default)]
70    pub order_by: Vec<OrderBy>,
71    /// Hard cap on total rows returned across all pages. `None` = no cap
72    /// beyond `page_size`.
73    #[serde(default)]
74    pub limit: Option<u64>,
75    #[serde(default = "default_page")]
76    pub page: u64,
77    #[serde(default = "default_page_size")]
78    pub page_size: u64,
79}
80
81/// Request body for the raw-SQL endpoint (`POST /api/v1/sql`).
82///
83/// `sql` is an arbitrary read-only `SELECT`; it is parsed and validated
84/// by [`crate::sql::validate`] before any engine sees it. `max_rows`
85/// lets a caller request *fewer* rows than the server-side cap
86/// (`[sql].max_rows`); it can never raise the cap.
87#[derive(Clone, Deserialize)]
88pub struct SqlRequest {
89    /// The SQL statement to execute. Must be a single read-only query
90    /// referencing a single registered dataset.
91    pub sql: String,
92    /// Optional client-side row cap. Clamped to the server-configured
93    /// `[sql].max_rows`; `None` uses the server cap.
94    #[serde(default)]
95    pub max_rows: Option<u64>,
96}
97
98/// One resolved aggregation, ready for SQL emission.
99#[derive(Clone)]
100pub struct AggSpec {
101    /// Canonical column name from the schema, or `None` for `COUNT(*)`.
102    pub col: Option<String>,
103    pub op: AggOp,
104    /// Output alias (JSON key). Always set after planning.
105    pub alias: String,
106}
107
108#[derive(Clone, Copy)]
109pub enum AggOp {
110    Count,
111    Sum,
112    Avg,
113    Min,
114    Max,
115}
116impl AggOp {
117    pub fn as_sql(self) -> &'static str {
118        match self {
119            AggOp::Count => "COUNT",
120            AggOp::Sum => "SUM",
121            AggOp::Avg => "AVG",
122            AggOp::Min => "MIN",
123            AggOp::Max => "MAX",
124        }
125    }
126    pub fn name(self) -> &'static str {
127        match self {
128            AggOp::Count => "count",
129            AggOp::Sum => "sum",
130            AggOp::Avg => "avg",
131            AggOp::Min => "min",
132            AggOp::Max => "max",
133        }
134    }
135}
136
137impl AggSpec {
138    /// Render the SQL aggregate expression for this spec, e.g. `COUNT(*)`
139    /// or `SUM("amount")`. The column name is quoted via
140    /// [`DatasetSchema::quote_ident`].
141    ///
142    /// By construction (see [`QueryRequest::agg_plan`]) every non-`COUNT`
143    /// op carries a resolved column and `COUNT` may omit one. If that
144    /// invariant is ever violated this returns `AppError::Internal`
145    /// rather than panicking, since the value flows onto a live HTTP path.
146    pub fn sql_expr(&self) -> Result<String, AppError> {
147        match (self.op, self.col.as_deref()) {
148            (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
149            (op, Some(c)) => Ok(format!(
150                "{}({})",
151                op.as_sql(),
152                DatasetSchema::quote_ident(c)
153            )),
154            (op, None) => Err(AppError::Internal(format!(
155                "aggregation '{}' resolved without a column (planner invariant violated)",
156                op.name()
157            ))),
158        }
159    }
160}
161
162/// Validated `GROUP BY` plan: canonical group columns + resolved aggregations.
163#[derive(Clone)]
164pub struct AggPlan {
165    pub group_cols: Vec<String>,
166    pub aggs: Vec<AggSpec>,
167}
168
169impl AggPlan {
170    /// All output names exposed by this plan, in SELECT order: group
171    /// columns first, then aggregation aliases. Used by `order_by`
172    /// validation when grouping is active.
173    pub fn output_names(&self) -> Vec<String> {
174        let mut v = self.group_cols.clone();
175        v.extend(self.aggs.iter().map(|a| a.alias.clone()));
176        v
177    }
178
179    /// Resolve a `HAVING` reference name to the SQL expression it filters
180    /// on. A group-by column maps to its quoted identifier; an aggregation
181    /// alias maps to the underlying aggregate expression (`COUNT(*)`,
182    /// `SUM("amount")`, …). Emitting the expression rather than the alias
183    /// keeps both backends happy — DataFusion does not allow aliases in
184    /// `HAVING`, while DuckDB does.
185    pub fn having_lhs(&self, name: &str) -> Result<String, AppError> {
186        let lc = name.to_lowercase();
187        if let Some(g) = self.group_cols.iter().find(|c| c.to_lowercase() == lc) {
188            return Ok(DatasetSchema::quote_ident(g));
189        }
190        if let Some(a) = self.aggs.iter().find(|a| a.alias.to_lowercase() == lc) {
191            return a.sql_expr();
192        }
193        Err(AppError::UnknownColumn(format!(
194            "{name} (must be a group_by column or aggregation alias)"
195        )))
196    }
197}
198
199impl QueryRequest {
200    /// Resolve the `group_by` + `aggregations` request into a validated
201    /// plan, or return `Ok(None)` when no grouping was requested.
202    ///
203    /// When `group_by` is non-empty and `aggregations` is empty, an
204    /// implicit `COUNT(*) AS count` is added so the plan always has at
205    /// least one output value.
206    pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
207        if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
208            return Err(AppError::InvalidValue(
209                "distinct is mutually exclusive with group_by / aggregations".into(),
210            ));
211        }
212        if self.group_by.is_empty() {
213            if !self.aggregations.is_empty() {
214                return Err(AppError::InvalidValue(
215                    "aggregations require a non-empty group_by".into(),
216                ));
217            }
218            return Ok(None);
219        }
220
221        let mut group_cols = Vec::with_capacity(self.group_by.len());
222        for name in &self.group_by {
223            group_cols.push(schema.find(name)?.name.clone());
224        }
225
226        let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
227            vec![Aggregation {
228                col: None,
229                op: "count".into(),
230                alias: None,
231            }]
232        } else {
233            self.aggregations.clone()
234        };
235
236        let mut aggs = Vec::with_capacity(raw_aggs.len());
237        for a in &raw_aggs {
238            let op = match a.op.to_ascii_lowercase().as_str() {
239                "count" => AggOp::Count,
240                "sum" => AggOp::Sum,
241                "avg" => AggOp::Avg,
242                "min" => AggOp::Min,
243                "max" => AggOp::Max,
244                other => {
245                    return Err(AppError::InvalidValue(format!(
246                        "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
247                    )));
248                }
249            };
250            let col = match (op, a.col.as_deref()) {
251                (AggOp::Count, None) => None,
252                (_, None) => {
253                    return Err(AppError::InvalidValue(format!(
254                        "aggregation '{}' requires a 'col'",
255                        op.name()
256                    )));
257                }
258                (_, Some(c)) => Some(schema.find(c)?.name.clone()),
259            };
260            let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
261                Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
262                None => "count".into(),
263            });
264            aggs.push(AggSpec { col, op, alias });
265        }
266
267        Ok(Some(AggPlan { group_cols, aggs }))
268    }
269
270    /// Resolve `having` against the aggregation `plan`, pairing each
271    /// predicate with the SQL expression its `col` references. Returns an
272    /// empty vec when no `HAVING` was requested.
273    ///
274    /// Errors if `having` is set without a `GROUP BY` (there is nothing to
275    /// filter post-aggregation), or if a predicate references a name that
276    /// is neither a group column nor an aggregation alias. The returned
277    /// expressions are bound by the backend, which appends the values to
278    /// the same parameter list it built for `WHERE`.
279    pub fn having_plan<'a>(
280        &'a self,
281        plan: Option<&AggPlan>,
282    ) -> Result<Vec<(String, &'a Predicate)>, AppError> {
283        if self.having.is_empty() {
284            return Ok(Vec::new());
285        }
286        let plan = plan.ok_or_else(|| {
287            AppError::InvalidValue("having requires a non-empty group_by".into())
288        })?;
289        self.having
290            .iter()
291            .map(|p| Ok((plan.having_lhs(&p.col)?, p)))
292            .collect()
293    }
294
295    /// Translate `order_by` into a validated SQL fragment, e.g.
296    /// `"\"a\" ASC, \"b\" DESC"`. Returns `Ok(None)` if no ordering was
297    /// requested.
298    ///
299    /// When `plan` is `Some`, sort keys must reference a group-by column
300    /// or an aggregation alias (the only names in scope after `GROUP BY`).
301    /// When `plan` is `None`, sort keys are validated against the dataset
302    /// schema.
303    pub fn order_by_sql(
304        &self,
305        schema: &DatasetSchema,
306        plan: Option<&AggPlan>,
307    ) -> Result<Option<String>, AppError> {
308        if self.order_by.is_empty() {
309            return Ok(None);
310        }
311        let parts: Vec<String> = self
312            .order_by
313            .iter()
314            .map(|o| {
315                let dir = match o
316                    .dir
317                    .as_deref()
318                    .unwrap_or("asc")
319                    .to_ascii_lowercase()
320                    .as_str()
321                {
322                    "asc" => "ASC",
323                    "desc" => "DESC",
324                    other => {
325                        return Err(AppError::InvalidValue(format!(
326                            "order_by direction must be 'asc' or 'desc' (got '{other}')"
327                        )));
328                    }
329                };
330                let ident = match plan {
331                    Some(p) => {
332                        let lc = o.col.to_lowercase();
333                        let allowed = p.output_names();
334                        allowed
335                            .iter()
336                            .find(|n| n.to_lowercase() == lc)
337                            .map(|n| DatasetSchema::quote_ident(n))
338                            .ok_or_else(|| {
339                                AppError::UnknownColumn(format!(
340                                    "{} (must be a group_by column or aggregation alias)",
341                                    o.col
342                                ))
343                            })?
344                    }
345                    None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
346                };
347                Ok(format!("{ident} {dir}"))
348            })
349            .collect::<Result<_, _>>()?;
350        Ok(Some(parts.join(", ")))
351    }
352
353    /// Compute the effective SQL `LIMIT` and `OFFSET` for this request,
354    /// honouring both `page`/`page_size` and the optional top-level `limit`
355    /// cap. `page_size_cap` is the per-page maximum the backend enforces.
356    ///
357    /// Semantics: pagination still drives offset; `limit` caps the total
358    /// number of rows ever returned across all pages. Once `offset >=
359    /// limit`, the effective LIMIT is `0` (empty page).
360    pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
361        let page = self.page.max(1);
362        let page_size = self.page_size.clamp(1, page_size_cap);
363        let offset = (page - 1) * page_size;
364        let limit = match self.limit {
365            Some(cap) => {
366                if offset >= cap {
367                    0
368                } else {
369                    page_size.min(cap - offset)
370                }
371            }
372            None => page_size,
373        };
374        (limit, offset)
375    }
376}
377
378fn default_page() -> u64 {
379    1
380}
381fn default_page_size() -> u64 {
382    1000
383}
384
385/// Body for `POST /api/datasets/{name}/count`. Predicates are optional —
386/// an empty body (or `{}`) counts every row in the dataset.
387#[derive(Clone, Deserialize, Default)]
388pub struct CountRequest {
389    #[serde(default)]
390    pub predicates: Vec<Predicate>,
391}
392
393// ---------------------------------------------------------------------------
394// Tests
395// ---------------------------------------------------------------------------
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
401
402    fn schema() -> DatasetSchema {
403        DatasetSchema::new(
404            "t",
405            vec![
406                ColumnInfo {
407                    name: "id".into(),
408                    logical: LogicalType::Int,
409                    sql_type: "BIGINT".into(),
410                    nullable: false,
411                },
412                ColumnInfo {
413                    name: "name".into(),
414                    logical: LogicalType::Utf8,
415                    sql_type: "VARCHAR".into(),
416                    nullable: true,
417                },
418                ColumnInfo {
419                    name: "score".into(),
420                    logical: LogicalType::Float,
421                    sql_type: "DOUBLE".into(),
422                    nullable: true,
423                },
424                ColumnInfo {
425                    name: "Mixed".into(),
426                    logical: LogicalType::Utf8,
427                    sql_type: "VARCHAR".into(),
428                    nullable: true,
429                },
430            ],
431        )
432    }
433
434    fn empty_req() -> QueryRequest {
435        QueryRequest {
436            columns: vec![],
437            predicates: vec![],
438            group_by: vec![],
439            aggregations: vec![],
440            having: vec![],
441            distinct: false,
442            order_by: vec![],
443            limit: None,
444            page: 1,
445            page_size: 1000,
446        }
447    }
448
449    // ---- agg_plan -----------------------------------------------------------
450
451    #[test]
452    fn agg_plan_none_when_no_group_by() {
453        let r = empty_req();
454        assert!(r.agg_plan(&schema()).unwrap().is_none());
455    }
456
457    #[test]
458    fn agg_plan_rejects_aggs_without_group_by() {
459        let mut r = empty_req();
460        r.aggregations = vec![Aggregation {
461            col: Some("score".into()),
462            op: "sum".into(),
463            alias: None,
464        }];
465        let err = r.agg_plan(&schema()).err().expect("expected error");
466        assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
467    }
468
469    #[test]
470    fn agg_plan_implicit_count_star() {
471        let mut r = empty_req();
472        r.group_by = vec!["name".into()];
473        let plan = r.agg_plan(&schema()).unwrap().unwrap();
474        assert_eq!(plan.group_cols, vec!["name"]);
475        assert_eq!(plan.aggs.len(), 1);
476        assert_eq!(plan.aggs[0].alias, "count");
477        assert!(plan.aggs[0].col.is_none());
478        assert!(matches!(plan.aggs[0].op, AggOp::Count));
479    }
480
481    #[test]
482    fn agg_plan_default_alias_format() {
483        let mut r = empty_req();
484        r.group_by = vec!["name".into()];
485        r.aggregations = vec![
486            Aggregation {
487                col: Some("score".into()),
488                op: "Sum".into(),
489                alias: None,
490            },
491            Aggregation {
492                col: Some("Mixed".into()),
493                op: "MAX".into(),
494                alias: Some("hi".into()),
495            },
496        ];
497        let plan = r.agg_plan(&schema()).unwrap().unwrap();
498        assert_eq!(plan.aggs[0].alias, "sum_score");
499        assert_eq!(plan.aggs[1].alias, "hi");
500        // Canonical column name is preserved from the schema (case fix).
501        assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
502    }
503
504    #[test]
505    fn agg_plan_unknown_op() {
506        let mut r = empty_req();
507        r.group_by = vec!["name".into()];
508        r.aggregations = vec![Aggregation {
509            col: Some("score".into()),
510            op: "median".into(),
511            alias: None,
512        }];
513        let err = r.agg_plan(&schema()).err().expect("expected error");
514        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
515    }
516
517    #[test]
518    fn agg_plan_non_count_requires_col() {
519        let mut r = empty_req();
520        r.group_by = vec!["name".into()];
521        r.aggregations = vec![Aggregation {
522            col: None,
523            op: "avg".into(),
524            alias: None,
525        }];
526        let err = r.agg_plan(&schema()).err().expect("expected error");
527        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
528    }
529
530    #[test]
531    fn agg_plan_unknown_group_col() {
532        let mut r = empty_req();
533        r.group_by = vec!["nope".into()];
534        let err = r.agg_plan(&schema()).err().expect("expected error");
535        assert!(matches!(err, AppError::UnknownColumn(_)));
536    }
537
538    #[test]
539    fn agg_plan_distinct_conflicts_with_group_by() {
540        let mut r = empty_req();
541        r.distinct = true;
542        r.group_by = vec!["name".into()];
543        let err = r.agg_plan(&schema()).err().expect("expected error");
544        assert!(matches!(err, AppError::InvalidValue(_)));
545    }
546
547    // ---- having_plan --------------------------------------------------------
548
549    #[test]
550    fn having_empty_returns_empty() {
551        let r = empty_req();
552        assert!(r.having_plan(None).unwrap().is_empty());
553    }
554
555    #[test]
556    fn having_requires_group_by() {
557        let mut r = empty_req();
558        r.having = vec![Predicate {
559            col: "count".into(),
560            op: "gt".into(),
561            val: Some(serde_json::json!(1)),
562        }];
563        let err = r.having_plan(None).err().expect("expected error");
564        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("group_by")));
565    }
566
567    #[test]
568    fn having_resolves_implicit_count_alias_to_expr() {
569        let mut r = empty_req();
570        r.group_by = vec!["name".into()];
571        r.having = vec![Predicate {
572            col: "count".into(),
573            op: "gt".into(),
574            val: Some(serde_json::json!(5)),
575        }];
576        let plan = r.agg_plan(&schema()).unwrap().unwrap();
577        let resolved = r.having_plan(Some(&plan)).unwrap();
578        assert_eq!(resolved.len(), 1);
579        // The implicit COUNT(*) alias resolves to the aggregate expression,
580        // not the alias name.
581        assert_eq!(resolved[0].0, "COUNT(*)");
582    }
583
584    #[test]
585    fn having_resolves_named_alias_and_group_col() {
586        let mut r = empty_req();
587        r.group_by = vec!["name".into()];
588        r.aggregations = vec![Aggregation {
589            col: Some("score".into()),
590            op: "sum".into(),
591            alias: Some("total".into()),
592        }];
593        r.having = vec![
594            Predicate {
595                col: "total".into(),
596                op: "gte".into(),
597                val: Some(serde_json::json!(100)),
598            },
599            Predicate {
600                col: "name".into(),
601                op: "eq".into(),
602                val: Some(serde_json::json!("x")),
603            },
604        ];
605        let plan = r.agg_plan(&schema()).unwrap().unwrap();
606        let resolved = r.having_plan(Some(&plan)).unwrap();
607        assert_eq!(resolved.len(), 2);
608        assert_eq!(resolved[0].0, "SUM(\"score\")");
609        // A group column resolves to its quoted identifier.
610        assert_eq!(resolved[1].0, "\"name\"");
611    }
612
613    #[test]
614    fn having_unknown_reference_errors() {
615        let mut r = empty_req();
616        r.group_by = vec!["name".into()];
617        r.having = vec![Predicate {
618            col: "nope".into(),
619            op: "gt".into(),
620            val: Some(serde_json::json!(1)),
621        }];
622        let plan = r.agg_plan(&schema()).unwrap().unwrap();
623        let err = r.having_plan(Some(&plan)).err().expect("expected error");
624        assert!(matches!(err, AppError::UnknownColumn(_)));
625    }
626
627    // ---- order_by_sql -------------------------------------------------------
628
629    #[test]
630    fn order_by_none_when_empty() {
631        let r = empty_req();
632        assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
633    }
634
635    #[test]
636    fn order_by_default_asc_and_quoting() {
637        let mut r = empty_req();
638        r.order_by = vec![OrderBy {
639            col: "ID".into(),
640            dir: None,
641        }];
642        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
643        // Canonical name from schema preserved + quoted.
644        assert_eq!(sql, "\"id\" ASC");
645    }
646
647    #[test]
648    fn order_by_desc_case_insensitive() {
649        let mut r = empty_req();
650        r.order_by = vec![OrderBy {
651            col: "name".into(),
652            dir: Some("DESC".into()),
653        }];
654        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
655        assert_eq!(sql, "\"name\" DESC");
656    }
657
658    #[test]
659    fn order_by_bad_direction() {
660        let mut r = empty_req();
661        r.order_by = vec![OrderBy {
662            col: "id".into(),
663            dir: Some("backwards".into()),
664        }];
665        let err = r.order_by_sql(&schema(), None).unwrap_err();
666        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
667    }
668
669    #[test]
670    fn order_by_unknown_col_no_plan() {
671        let mut r = empty_req();
672        r.order_by = vec![OrderBy {
673            col: "missing".into(),
674            dir: None,
675        }];
676        let err = r.order_by_sql(&schema(), None).unwrap_err();
677        assert!(matches!(err, AppError::UnknownColumn(_)));
678    }
679
680    #[test]
681    fn order_by_with_plan_restricts_to_outputs() {
682        let mut r = empty_req();
683        r.group_by = vec!["name".into()];
684        r.aggregations = vec![Aggregation {
685            col: Some("score".into()),
686            op: "sum".into(),
687            alias: Some("total".into()),
688        }];
689        let plan = r.agg_plan(&schema()).unwrap().unwrap();
690
691        // Allowed: group col + alias.
692        r.order_by = vec![
693            OrderBy {
694                col: "name".into(),
695                dir: Some("asc".into()),
696            },
697            OrderBy {
698                col: "TOTAL".into(),
699                dir: Some("desc".into()),
700            },
701        ];
702        let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
703        assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
704
705        // Not allowed: raw schema column that isn't in the group/agg output.
706        r.order_by = vec![OrderBy {
707            col: "id".into(),
708            dir: None,
709        }];
710        let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
711        assert!(matches!(err, AppError::UnknownColumn(_)));
712    }
713
714    // ---- effective_limit_offset --------------------------------------------
715
716    #[test]
717    fn limit_offset_first_page_default() {
718        let r = empty_req();
719        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
720    }
721
722    #[test]
723    fn limit_offset_pagination() {
724        let mut r = empty_req();
725        r.page = 3;
726        r.page_size = 50;
727        assert_eq!(r.effective_limit_offset(1000), (50, 100));
728    }
729
730    #[test]
731    fn limit_offset_caps_page_size_to_max() {
732        let mut r = empty_req();
733        r.page_size = 10_000;
734        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
735    }
736
737    #[test]
738    fn limit_offset_page_zero_treated_as_one() {
739        let mut r = empty_req();
740        r.page = 0;
741        r.page_size = 10;
742        assert_eq!(r.effective_limit_offset(1000), (10, 0));
743    }
744
745    #[test]
746    fn limit_offset_top_level_cap_truncates_last_page() {
747        let mut r = empty_req();
748        r.page = 2;
749        r.page_size = 50;
750        r.limit = Some(75); // offset 50, only 25 rows remain under cap.
751        assert_eq!(r.effective_limit_offset(1000), (25, 50));
752    }
753
754    #[test]
755    fn limit_offset_top_level_cap_exhausted_returns_zero() {
756        let mut r = empty_req();
757        r.page = 3;
758        r.page_size = 50;
759        r.limit = Some(75); // offset 100 >= 75 -> empty page.
760        assert_eq!(r.effective_limit_offset(1000), (0, 100));
761    }
762}