Skip to main content

datapress_core/
models.rs

1use serde::Deserialize;
2use serde_json::Value as JsonValue;
3
4use crate::errors::AppError;
5use crate::schema::DatasetSchema;
6
7#[derive(Clone, Deserialize)]
8pub struct Predicate {
9    pub col: String,
10    /// eq | neq | gt | gte | lt | lte | like | ilike | in | is_null | is_not_null
11    pub op: String,
12    pub val: Option<JsonValue>,
13}
14
15/// A single `ORDER BY` clause entry.
16///
17/// `dir` is case-insensitive; accepted values are `"asc"` (default) and
18/// `"desc"`. Omitted = ascending.
19#[derive(Clone, Deserialize)]
20pub struct OrderBy {
21    pub col: String,
22    #[serde(default)]
23    pub dir: Option<String>,
24}
25
26/// A single aggregation in a `group_by` query.
27///
28/// `op` is one of `count | sum | avg | min | max` (case-insensitive).
29/// `col` is required for every op except `count`, where it may be omitted
30/// to mean `COUNT(*)`. `alias` is the JSON output key; if omitted, it
31/// defaults to `count` for `COUNT(*)` and `{op}_{col}` otherwise.
32#[derive(Clone, Deserialize)]
33pub struct Aggregation {
34    #[serde(default)]
35    pub col: Option<String>,
36    pub op: String,
37    #[serde(default)]
38    pub alias: Option<String>,
39}
40
41#[derive(Clone, Deserialize)]
42pub struct QueryRequest {
43    /// Columns to return. Empty = all columns. Ignored when `group_by` is
44    /// non-empty (the SELECT list is then derived from `group_by` + `aggregations`).
45    #[serde(default)]
46    pub columns: Vec<String>,
47    #[serde(default)]
48    pub predicates: Vec<Predicate>,
49    /// Group-by columns. Empty = no grouping (regular row scan). When set,
50    /// the response shape is `{ group_col_1, …, alias_1, … }` per row.
51    #[serde(default)]
52    pub group_by: Vec<String>,
53    /// Aggregations to compute over each group. When `group_by` is set and
54    /// this is empty, an implicit `{ op: "count" }` is added.
55    #[serde(default)]
56    pub aggregations: Vec<Aggregation>,
57    /// Post-aggregation row filters, ANDed together (SQL `HAVING`). Each
58    /// predicate's `col` references a `group_by` column or an aggregation
59    /// alias; requires a non-empty `group_by`. Same op vocabulary as
60    /// `predicates` (`eq | neq | gt | gte | lt | lte | like | ilike | in |
61    /// is_null | is_not_null`).
62    #[serde(default)]
63    pub having: Vec<Predicate>,
64    /// Return only distinct rows over the projected columns. Mutually
65    /// exclusive with `group_by` / `aggregations`.
66    #[serde(default)]
67    pub distinct: bool,
68    /// Sort spec. Empty = unsorted (engine order).
69    #[serde(default)]
70    pub order_by: Vec<OrderBy>,
71    /// Hard cap on total rows returned across all pages. `None` = no cap
72    /// beyond `page_size`.
73    #[serde(default)]
74    pub limit: Option<u64>,
75    #[serde(default = "default_page")]
76    pub page: u64,
77    #[serde(default = "default_page_size")]
78    pub page_size: u64,
79}
80
81/// Request body for the raw-SQL endpoint (`POST /api/v1/sql`).
82///
83/// `sql` is an arbitrary read-only `SELECT`; it is parsed and validated
84/// by [`crate::sql::validate`] before any engine sees it. `max_rows`
85/// lets a caller request *fewer* rows than the server-side cap
86/// (`[sql].max_rows`); it can never raise the cap.
87#[derive(Clone, Deserialize)]
88pub struct SqlRequest {
89    /// The SQL statement to execute. Must be a single read-only query
90    /// referencing a single registered dataset.
91    pub sql: String,
92    /// Optional client-side row cap. Clamped to the server-configured
93    /// `[sql].max_rows`; `None` uses the server cap.
94    #[serde(default)]
95    pub max_rows: Option<u64>,
96}
97
98/// One resolved aggregation, ready for SQL emission.
99#[derive(Clone)]
100pub struct AggSpec {
101    /// Canonical column name from the schema, or `None` for `COUNT(*)`.
102    pub col: Option<String>,
103    pub op: AggOp,
104    /// Output alias (JSON key). Always set after planning.
105    pub alias: String,
106}
107
108#[derive(Clone, Copy)]
109pub enum AggOp {
110    Count,
111    Sum,
112    Avg,
113    Min,
114    Max,
115}
116impl AggOp {
117    pub fn as_sql(self) -> &'static str {
118        match self {
119            AggOp::Count => "COUNT",
120            AggOp::Sum => "SUM",
121            AggOp::Avg => "AVG",
122            AggOp::Min => "MIN",
123            AggOp::Max => "MAX",
124        }
125    }
126    pub fn name(self) -> &'static str {
127        match self {
128            AggOp::Count => "count",
129            AggOp::Sum => "sum",
130            AggOp::Avg => "avg",
131            AggOp::Min => "min",
132            AggOp::Max => "max",
133        }
134    }
135}
136
137impl AggSpec {
138    /// Render the SQL aggregate expression for this spec, e.g. `COUNT(*)`
139    /// or `SUM("amount")`. The column name is quoted via
140    /// [`DatasetSchema::quote_ident`].
141    ///
142    /// By construction (see [`QueryRequest::agg_plan`]) every non-`COUNT`
143    /// op carries a resolved column and `COUNT` may omit one. If that
144    /// invariant is ever violated this returns `AppError::Internal`
145    /// rather than panicking, since the value flows onto a live HTTP path.
146    pub fn sql_expr(&self) -> Result<String, AppError> {
147        match (self.op, self.col.as_deref()) {
148            (AggOp::Count, None) => Ok("COUNT(*)".to_string()),
149            (op, Some(c)) => Ok(format!(
150                "{}({})",
151                op.as_sql(),
152                DatasetSchema::quote_ident(c)
153            )),
154            (op, None) => Err(AppError::Internal(format!(
155                "aggregation '{}' resolved without a column (planner invariant violated)",
156                op.name()
157            ))),
158        }
159    }
160}
161
162/// Validated `GROUP BY` plan: canonical group columns + resolved aggregations.
163#[derive(Clone)]
164pub struct AggPlan {
165    pub group_cols: Vec<String>,
166    pub aggs: Vec<AggSpec>,
167}
168
169impl AggPlan {
170    /// All output names exposed by this plan, in SELECT order: group
171    /// columns first, then aggregation aliases. Used by `order_by`
172    /// validation when grouping is active.
173    pub fn output_names(&self) -> Vec<String> {
174        let mut v = self.group_cols.clone();
175        v.extend(self.aggs.iter().map(|a| a.alias.clone()));
176        v
177    }
178
179    /// Resolve a `HAVING` reference name to the SQL expression it filters
180    /// on. A group-by column maps to its quoted identifier; an aggregation
181    /// alias maps to the underlying aggregate expression (`COUNT(*)`,
182    /// `SUM("amount")`, …). Emitting the expression rather than the alias
183    /// keeps both backends happy — DataFusion does not allow aliases in
184    /// `HAVING`, while DuckDB does.
185    pub fn having_lhs(&self, name: &str) -> Result<String, AppError> {
186        let lc = name.to_lowercase();
187        if let Some(g) = self.group_cols.iter().find(|c| c.to_lowercase() == lc) {
188            return Ok(DatasetSchema::quote_ident(g));
189        }
190        if let Some(a) = self.aggs.iter().find(|a| a.alias.to_lowercase() == lc) {
191            return a.sql_expr();
192        }
193        Err(AppError::UnknownColumn(format!(
194            "{name} (must be a group_by column or aggregation alias)"
195        )))
196    }
197}
198
199impl QueryRequest {
200    /// Resolve the `group_by` + `aggregations` request into a validated
201    /// plan, or return `Ok(None)` when no grouping was requested.
202    ///
203    /// When `group_by` is non-empty and `aggregations` is empty, an
204    /// implicit `COUNT(*) AS count` is added so the plan always has at
205    /// least one output value.
206    pub fn agg_plan(&self, schema: &DatasetSchema) -> Result<Option<AggPlan>, AppError> {
207        if self.distinct && (!self.group_by.is_empty() || !self.aggregations.is_empty()) {
208            return Err(AppError::InvalidValue(
209                "distinct is mutually exclusive with group_by / aggregations".into(),
210            ));
211        }
212        if self.group_by.is_empty() {
213            if !self.aggregations.is_empty() {
214                return Err(AppError::InvalidValue(
215                    "aggregations require a non-empty group_by".into(),
216                ));
217            }
218            return Ok(None);
219        }
220
221        let mut group_cols = Vec::with_capacity(self.group_by.len());
222        for name in &self.group_by {
223            group_cols.push(schema.find(name)?.name.clone());
224        }
225
226        let raw_aggs: Vec<Aggregation> = if self.aggregations.is_empty() {
227            vec![Aggregation {
228                col: None,
229                op: "count".into(),
230                alias: None,
231            }]
232        } else {
233            self.aggregations.clone()
234        };
235
236        let mut aggs = Vec::with_capacity(raw_aggs.len());
237        for a in &raw_aggs {
238            let op = match a.op.to_ascii_lowercase().as_str() {
239                "count" => AggOp::Count,
240                "sum" => AggOp::Sum,
241                "avg" => AggOp::Avg,
242                "min" => AggOp::Min,
243                "max" => AggOp::Max,
244                other => {
245                    return Err(AppError::InvalidValue(format!(
246                        "unknown aggregation op '{other}' (expected count|sum|avg|min|max)"
247                    )));
248                }
249            };
250            let col = match (op, a.col.as_deref()) {
251                (AggOp::Count, None) => None,
252                (_, None) => {
253                    return Err(AppError::InvalidValue(format!(
254                        "aggregation '{}' requires a 'col'",
255                        op.name()
256                    )));
257                }
258                (_, Some(c)) => Some(schema.find(c)?.name.clone()),
259            };
260            let alias = a.alias.clone().unwrap_or_else(|| match col.as_deref() {
261                Some(c) => format!("{}_{}", op.name(), c.to_lowercase()),
262                None => "count".into(),
263            });
264            aggs.push(AggSpec { col, op, alias });
265        }
266
267        Ok(Some(AggPlan { group_cols, aggs }))
268    }
269
270    /// Resolve `having` against the aggregation `plan`, pairing each
271    /// predicate with the SQL expression its `col` references. Returns an
272    /// empty vec when no `HAVING` was requested.
273    ///
274    /// Errors if `having` is set without a `GROUP BY` (there is nothing to
275    /// filter post-aggregation), or if a predicate references a name that
276    /// is neither a group column nor an aggregation alias. The returned
277    /// expressions are bound by the backend, which appends the values to
278    /// the same parameter list it built for `WHERE`.
279    pub fn having_plan<'a>(
280        &'a self,
281        plan: Option<&AggPlan>,
282    ) -> Result<Vec<(String, &'a Predicate)>, AppError> {
283        if self.having.is_empty() {
284            return Ok(Vec::new());
285        }
286        let plan = plan.ok_or_else(|| {
287            AppError::InvalidValue("having requires a non-empty group_by".into())
288        })?;
289        self.having
290            .iter()
291            .map(|p| Ok((plan.having_lhs(&p.col)?, p)))
292            .collect()
293    }
294
295    /// Translate `order_by` into a validated SQL fragment, e.g.
296    /// `"\"a\" ASC, \"b\" DESC"`. Returns `Ok(None)` if no ordering was
297    /// requested.
298    ///
299    /// When `plan` is `Some`, sort keys must reference a group-by column
300    /// or an aggregation alias (the only names in scope after `GROUP BY`).
301    /// When `plan` is `None`, sort keys are validated against the dataset
302    /// schema.
303    pub fn order_by_sql(
304        &self,
305        schema: &DatasetSchema,
306        plan: Option<&AggPlan>,
307    ) -> Result<Option<String>, AppError> {
308        if self.order_by.is_empty() {
309            return Ok(None);
310        }
311        let parts: Vec<String> = self
312            .order_by
313            .iter()
314            .map(|o| {
315                let dir = match o
316                    .dir
317                    .as_deref()
318                    .unwrap_or("asc")
319                    .to_ascii_lowercase()
320                    .as_str()
321                {
322                    "asc" => "ASC",
323                    "desc" => "DESC",
324                    other => {
325                        return Err(AppError::InvalidValue(format!(
326                            "order_by direction must be 'asc' or 'desc' (got '{other}')"
327                        )));
328                    }
329                };
330                let ident = match plan {
331                    Some(p) => {
332                        let lc = o.col.to_lowercase();
333                        let allowed = p.output_names();
334                        allowed
335                            .iter()
336                            .find(|n| n.to_lowercase() == lc)
337                            .map(|n| DatasetSchema::quote_ident(n))
338                            .ok_or_else(|| {
339                                AppError::UnknownColumn(format!(
340                                    "{} (must be a group_by column or aggregation alias)",
341                                    o.col
342                                ))
343                            })?
344                    }
345                    None => DatasetSchema::quote_ident(&schema.find(&o.col)?.name),
346                };
347                Ok(format!("{ident} {dir}"))
348            })
349            .collect::<Result<_, _>>()?;
350        Ok(Some(parts.join(", ")))
351    }
352
353    /// Compute the effective SQL `LIMIT` and `OFFSET` for this request,
354    /// honouring both `page`/`page_size` and the optional top-level `limit`
355    /// cap. `page_size_cap` is the per-page maximum the backend enforces.
356    ///
357    /// Semantics: pagination still drives offset; `limit` caps the total
358    /// number of rows ever returned across all pages. Once `offset >=
359    /// limit`, the effective LIMIT is `0` (empty page).
360    pub fn effective_limit_offset(&self, page_size_cap: u64) -> (u64, u64) {
361        let page = self.page.max(1);
362        let page_size = self.page_size.clamp(1, page_size_cap);
363        let offset = (page - 1) * page_size;
364        let limit = match self.limit {
365            Some(cap) => {
366                if offset >= cap {
367                    0
368                } else {
369                    page_size.min(cap - offset)
370                }
371            }
372            None => page_size,
373        };
374        (limit, offset)
375    }
376
377    /// Apply the dataset's column-level access filters to this request,
378    /// rejecting any use of a hidden or predicate-restricted column and
379    /// narrowing a default (all-columns) projection to the visible set.
380    ///
381    /// Enforced consistently for every backend and response format because
382    /// the shared HTTP handlers call it before dispatching. Rules:
383    /// - a predicate on a hidden column → `UnknownColumn`; on a
384    ///   predicate-restricted (but visible) column → `Forbidden`;
385    /// - `group_by`, aggregation, and (ungrouped) `order_by` columns must be
386    ///   visible, else `UnknownColumn`;
387    /// - an explicit `columns` entry must be visible, else `UnknownColumn`;
388    /// - an empty `columns` (all columns) is rewritten to the visible set so
389    ///   hidden columns never appear in the default projection.
390    ///
391    /// A no-op when the schema carries no active filters.
392    pub fn enforce_column_filters(&mut self, schema: &DatasetSchema) -> Result<(), AppError> {
393        if !schema.has_column_filters() {
394            return Ok(());
395        }
396        for p in &self.predicates {
397            schema.find_for_predicate(&p.col)?;
398        }
399        for c in &self.group_by {
400            schema.find_visible(c)?;
401        }
402        for a in &self.aggregations {
403            if let Some(c) = &a.col {
404                schema.find_visible(c)?;
405            }
406        }
407        let grouping = !self.group_by.is_empty();
408        if !grouping {
409            // In grouped mode `order_by` references group columns / aggregation
410            // aliases, validated by `order_by_sql`; only ungrouped sorts name
411            // raw schema columns.
412            for o in &self.order_by {
413                schema.find_visible(&o.col)?;
414            }
415        }
416        if self.columns.is_empty() {
417            if !grouping && schema.projection_filter.is_active() {
418                self.columns = schema
419                    .visible_columns()
420                    .into_iter()
421                    .map(|c| c.name.clone())
422                    .collect();
423            }
424        } else {
425            for c in &self.columns {
426                schema.find_visible(c)?;
427            }
428        }
429        Ok(())
430    }
431}
432
433fn default_page() -> u64 {
434    1
435}
436fn default_page_size() -> u64 {
437    1000
438}
439
440/// Body for `POST /api/datasets/{name}/count`. Predicates are optional —
441/// an empty body (or `{}`) counts every row in the dataset.
442#[derive(Clone, Deserialize, Default)]
443pub struct CountRequest {
444    #[serde(default)]
445    pub predicates: Vec<Predicate>,
446}
447
448impl CountRequest {
449    /// Reject predicates that reference a hidden or predicate-restricted
450    /// column, mirroring [`QueryRequest::enforce_column_filters`]. A no-op
451    /// when the schema carries no active filters.
452    pub fn enforce_column_filters(&self, schema: &DatasetSchema) -> Result<(), AppError> {
453        if !schema.has_column_filters() {
454            return Ok(());
455        }
456        for p in &self.predicates {
457            schema.find_for_predicate(&p.col)?;
458        }
459        Ok(())
460    }
461}
462
463// ---------------------------------------------------------------------------
464// Tests
465// ---------------------------------------------------------------------------
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::schema::{ColumnInfo, DatasetSchema, LogicalType};
471
472    fn schema() -> DatasetSchema {
473        DatasetSchema::new(
474            "t",
475            vec![
476                ColumnInfo {
477                    name: "id".into(),
478                    logical: LogicalType::Int,
479                    sql_type: "BIGINT".into(),
480                    nullable: false,
481                },
482                ColumnInfo {
483                    name: "name".into(),
484                    logical: LogicalType::Utf8,
485                    sql_type: "VARCHAR".into(),
486                    nullable: true,
487                },
488                ColumnInfo {
489                    name: "score".into(),
490                    logical: LogicalType::Float,
491                    sql_type: "DOUBLE".into(),
492                    nullable: true,
493                },
494                ColumnInfo {
495                    name: "Mixed".into(),
496                    logical: LogicalType::Utf8,
497                    sql_type: "VARCHAR".into(),
498                    nullable: true,
499                },
500            ],
501        )
502    }
503
504    fn empty_req() -> QueryRequest {
505        QueryRequest {
506            columns: vec![],
507            predicates: vec![],
508            group_by: vec![],
509            aggregations: vec![],
510            having: vec![],
511            distinct: false,
512            order_by: vec![],
513            limit: None,
514            page: 1,
515            page_size: 1000,
516        }
517    }
518
519    // ---- agg_plan -----------------------------------------------------------
520
521    #[test]
522    fn agg_plan_none_when_no_group_by() {
523        let r = empty_req();
524        assert!(r.agg_plan(&schema()).unwrap().is_none());
525    }
526
527    #[test]
528    fn agg_plan_rejects_aggs_without_group_by() {
529        let mut r = empty_req();
530        r.aggregations = vec![Aggregation {
531            col: Some("score".into()),
532            op: "sum".into(),
533            alias: None,
534        }];
535        let err = r.agg_plan(&schema()).err().expect("expected error");
536        assert!(matches!(err, AppError::InvalidValue(_)), "got {err:?}");
537    }
538
539    #[test]
540    fn agg_plan_implicit_count_star() {
541        let mut r = empty_req();
542        r.group_by = vec!["name".into()];
543        let plan = r.agg_plan(&schema()).unwrap().unwrap();
544        assert_eq!(plan.group_cols, vec!["name"]);
545        assert_eq!(plan.aggs.len(), 1);
546        assert_eq!(plan.aggs[0].alias, "count");
547        assert!(plan.aggs[0].col.is_none());
548        assert!(matches!(plan.aggs[0].op, AggOp::Count));
549    }
550
551    #[test]
552    fn agg_plan_default_alias_format() {
553        let mut r = empty_req();
554        r.group_by = vec!["name".into()];
555        r.aggregations = vec![
556            Aggregation {
557                col: Some("score".into()),
558                op: "Sum".into(),
559                alias: None,
560            },
561            Aggregation {
562                col: Some("Mixed".into()),
563                op: "MAX".into(),
564                alias: Some("hi".into()),
565            },
566        ];
567        let plan = r.agg_plan(&schema()).unwrap().unwrap();
568        assert_eq!(plan.aggs[0].alias, "sum_score");
569        assert_eq!(plan.aggs[1].alias, "hi");
570        // Canonical column name is preserved from the schema (case fix).
571        assert_eq!(plan.aggs[1].col.as_deref(), Some("Mixed"));
572    }
573
574    #[test]
575    fn agg_plan_unknown_op() {
576        let mut r = empty_req();
577        r.group_by = vec!["name".into()];
578        r.aggregations = vec![Aggregation {
579            col: Some("score".into()),
580            op: "median".into(),
581            alias: None,
582        }];
583        let err = r.agg_plan(&schema()).err().expect("expected error");
584        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("median")));
585    }
586
587    #[test]
588    fn agg_plan_non_count_requires_col() {
589        let mut r = empty_req();
590        r.group_by = vec!["name".into()];
591        r.aggregations = vec![Aggregation {
592            col: None,
593            op: "avg".into(),
594            alias: None,
595        }];
596        let err = r.agg_plan(&schema()).err().expect("expected error");
597        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("avg")));
598    }
599
600    #[test]
601    fn agg_plan_unknown_group_col() {
602        let mut r = empty_req();
603        r.group_by = vec!["nope".into()];
604        let err = r.agg_plan(&schema()).err().expect("expected error");
605        assert!(matches!(err, AppError::UnknownColumn(_)));
606    }
607
608    #[test]
609    fn agg_plan_distinct_conflicts_with_group_by() {
610        let mut r = empty_req();
611        r.distinct = true;
612        r.group_by = vec!["name".into()];
613        let err = r.agg_plan(&schema()).err().expect("expected error");
614        assert!(matches!(err, AppError::InvalidValue(_)));
615    }
616
617    // ---- having_plan --------------------------------------------------------
618
619    #[test]
620    fn having_empty_returns_empty() {
621        let r = empty_req();
622        assert!(r.having_plan(None).unwrap().is_empty());
623    }
624
625    #[test]
626    fn having_requires_group_by() {
627        let mut r = empty_req();
628        r.having = vec![Predicate {
629            col: "count".into(),
630            op: "gt".into(),
631            val: Some(serde_json::json!(1)),
632        }];
633        let err = r.having_plan(None).err().expect("expected error");
634        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("group_by")));
635    }
636
637    #[test]
638    fn having_resolves_implicit_count_alias_to_expr() {
639        let mut r = empty_req();
640        r.group_by = vec!["name".into()];
641        r.having = vec![Predicate {
642            col: "count".into(),
643            op: "gt".into(),
644            val: Some(serde_json::json!(5)),
645        }];
646        let plan = r.agg_plan(&schema()).unwrap().unwrap();
647        let resolved = r.having_plan(Some(&plan)).unwrap();
648        assert_eq!(resolved.len(), 1);
649        // The implicit COUNT(*) alias resolves to the aggregate expression,
650        // not the alias name.
651        assert_eq!(resolved[0].0, "COUNT(*)");
652    }
653
654    #[test]
655    fn having_resolves_named_alias_and_group_col() {
656        let mut r = empty_req();
657        r.group_by = vec!["name".into()];
658        r.aggregations = vec![Aggregation {
659            col: Some("score".into()),
660            op: "sum".into(),
661            alias: Some("total".into()),
662        }];
663        r.having = vec![
664            Predicate {
665                col: "total".into(),
666                op: "gte".into(),
667                val: Some(serde_json::json!(100)),
668            },
669            Predicate {
670                col: "name".into(),
671                op: "eq".into(),
672                val: Some(serde_json::json!("x")),
673            },
674        ];
675        let plan = r.agg_plan(&schema()).unwrap().unwrap();
676        let resolved = r.having_plan(Some(&plan)).unwrap();
677        assert_eq!(resolved.len(), 2);
678        assert_eq!(resolved[0].0, "SUM(\"score\")");
679        // A group column resolves to its quoted identifier.
680        assert_eq!(resolved[1].0, "\"name\"");
681    }
682
683    #[test]
684    fn having_unknown_reference_errors() {
685        let mut r = empty_req();
686        r.group_by = vec!["name".into()];
687        r.having = vec![Predicate {
688            col: "nope".into(),
689            op: "gt".into(),
690            val: Some(serde_json::json!(1)),
691        }];
692        let plan = r.agg_plan(&schema()).unwrap().unwrap();
693        let err = r.having_plan(Some(&plan)).err().expect("expected error");
694        assert!(matches!(err, AppError::UnknownColumn(_)));
695    }
696
697    // ---- order_by_sql -------------------------------------------------------
698
699    #[test]
700    fn order_by_none_when_empty() {
701        let r = empty_req();
702        assert!(r.order_by_sql(&schema(), None).unwrap().is_none());
703    }
704
705    #[test]
706    fn order_by_default_asc_and_quoting() {
707        let mut r = empty_req();
708        r.order_by = vec![OrderBy {
709            col: "ID".into(),
710            dir: None,
711        }];
712        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
713        // Canonical name from schema preserved + quoted.
714        assert_eq!(sql, "\"id\" ASC");
715    }
716
717    #[test]
718    fn order_by_desc_case_insensitive() {
719        let mut r = empty_req();
720        r.order_by = vec![OrderBy {
721            col: "name".into(),
722            dir: Some("DESC".into()),
723        }];
724        let sql = r.order_by_sql(&schema(), None).unwrap().unwrap();
725        assert_eq!(sql, "\"name\" DESC");
726    }
727
728    #[test]
729    fn order_by_bad_direction() {
730        let mut r = empty_req();
731        r.order_by = vec![OrderBy {
732            col: "id".into(),
733            dir: Some("backwards".into()),
734        }];
735        let err = r.order_by_sql(&schema(), None).unwrap_err();
736        assert!(matches!(err, AppError::InvalidValue(m) if m.contains("backwards")));
737    }
738
739    #[test]
740    fn order_by_unknown_col_no_plan() {
741        let mut r = empty_req();
742        r.order_by = vec![OrderBy {
743            col: "missing".into(),
744            dir: None,
745        }];
746        let err = r.order_by_sql(&schema(), None).unwrap_err();
747        assert!(matches!(err, AppError::UnknownColumn(_)));
748    }
749
750    #[test]
751    fn order_by_with_plan_restricts_to_outputs() {
752        let mut r = empty_req();
753        r.group_by = vec!["name".into()];
754        r.aggregations = vec![Aggregation {
755            col: Some("score".into()),
756            op: "sum".into(),
757            alias: Some("total".into()),
758        }];
759        let plan = r.agg_plan(&schema()).unwrap().unwrap();
760
761        // Allowed: group col + alias.
762        r.order_by = vec![
763            OrderBy {
764                col: "name".into(),
765                dir: Some("asc".into()),
766            },
767            OrderBy {
768                col: "TOTAL".into(),
769                dir: Some("desc".into()),
770            },
771        ];
772        let sql = r.order_by_sql(&schema(), Some(&plan)).unwrap().unwrap();
773        assert_eq!(sql, "\"name\" ASC, \"total\" DESC");
774
775        // Not allowed: raw schema column that isn't in the group/agg output.
776        r.order_by = vec![OrderBy {
777            col: "id".into(),
778            dir: None,
779        }];
780        let err = r.order_by_sql(&schema(), Some(&plan)).unwrap_err();
781        assert!(matches!(err, AppError::UnknownColumn(_)));
782    }
783
784    // ---- effective_limit_offset --------------------------------------------
785
786    #[test]
787    fn limit_offset_first_page_default() {
788        let r = empty_req();
789        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
790    }
791
792    #[test]
793    fn limit_offset_pagination() {
794        let mut r = empty_req();
795        r.page = 3;
796        r.page_size = 50;
797        assert_eq!(r.effective_limit_offset(1000), (50, 100));
798    }
799
800    #[test]
801    fn limit_offset_caps_page_size_to_max() {
802        let mut r = empty_req();
803        r.page_size = 10_000;
804        assert_eq!(r.effective_limit_offset(1000), (1000, 0));
805    }
806
807    #[test]
808    fn limit_offset_page_zero_treated_as_one() {
809        let mut r = empty_req();
810        r.page = 0;
811        r.page_size = 10;
812        assert_eq!(r.effective_limit_offset(1000), (10, 0));
813    }
814
815    #[test]
816    fn limit_offset_top_level_cap_truncates_last_page() {
817        let mut r = empty_req();
818        r.page = 2;
819        r.page_size = 50;
820        r.limit = Some(75); // offset 50, only 25 rows remain under cap.
821        assert_eq!(r.effective_limit_offset(1000), (25, 50));
822    }
823
824    #[test]
825    fn limit_offset_top_level_cap_exhausted_returns_zero() {
826        let mut r = empty_req();
827        r.page = 3;
828        r.page_size = 50;
829        r.limit = Some(75); // offset 100 >= 75 -> empty page.
830        assert_eq!(r.effective_limit_offset(1000), (0, 100));
831    }
832
833    // ---- enforce_column_filters ---------------------------------------------
834
835    fn exclude(cols: &[&str]) -> crate::config::ColumnFilter {
836        crate::config::ColumnFilter {
837            include: vec![],
838            exclude: cols.iter().map(|s| s.to_string()).collect(),
839        }
840    }
841
842    fn pred(col: &str) -> Predicate {
843        Predicate {
844            col: col.into(),
845            op: "eq".into(),
846            val: Some(serde_json::json!(1)),
847        }
848    }
849
850    #[test]
851    fn enforce_is_noop_without_filters() {
852        let mut r = empty_req();
853        r.columns = vec!["id".into()];
854        r.enforce_column_filters(&schema()).unwrap();
855        assert_eq!(r.columns, vec!["id".to_string()]);
856    }
857
858    #[test]
859    fn enforce_predicate_on_hidden_column_is_unknown() {
860        let sch = schema()
861            .with_filters(Default::default(), exclude(&["score"]))
862            .unwrap();
863        let mut r = empty_req();
864        r.predicates = vec![pred("score")];
865        assert!(matches!(
866            r.enforce_column_filters(&sch).unwrap_err(),
867            AppError::UnknownColumn(_)
868        ));
869    }
870
871    #[test]
872    fn enforce_predicate_on_restricted_column_is_forbidden() {
873        let sch = schema()
874            .with_filters(exclude(&["score"]), Default::default())
875            .unwrap();
876        let mut r = empty_req();
877        r.predicates = vec![pred("score")];
878        assert!(matches!(
879            r.enforce_column_filters(&sch).unwrap_err(),
880            AppError::Forbidden(_)
881        ));
882        // still selectable
883        let mut r2 = empty_req();
884        r2.columns = vec!["score".into()];
885        r2.enforce_column_filters(&sch).unwrap();
886    }
887
888    #[test]
889    fn enforce_explicit_hidden_column_is_rejected() {
890        let sch = schema()
891            .with_filters(Default::default(), exclude(&["score"]))
892            .unwrap();
893        let mut r = empty_req();
894        r.columns = vec!["id".into(), "score".into()];
895        assert!(matches!(
896            r.enforce_column_filters(&sch).unwrap_err(),
897            AppError::UnknownColumn(_)
898        ));
899    }
900
901    #[test]
902    fn enforce_default_all_narrows_to_visible() {
903        let sch = schema()
904            .with_filters(Default::default(), exclude(&["score", "Mixed"]))
905            .unwrap();
906        let mut r = empty_req();
907        r.enforce_column_filters(&sch).unwrap();
908        assert_eq!(r.columns, vec!["id".to_string(), "name".to_string()]);
909    }
910
911    #[test]
912    fn enforce_default_all_untouched_when_grouped() {
913        let sch = schema()
914            .with_filters(Default::default(), exclude(&["score"]))
915            .unwrap();
916        let mut r = empty_req();
917        r.group_by = vec!["name".into()];
918        r.enforce_column_filters(&sch).unwrap();
919        // grouped projection is driven by group_by/aggregations, not columns
920        assert!(r.columns.is_empty());
921    }
922
923    #[test]
924    fn enforce_group_by_hidden_column_is_rejected() {
925        let sch = schema()
926            .with_filters(Default::default(), exclude(&["score"]))
927            .unwrap();
928        let mut r = empty_req();
929        r.group_by = vec!["score".into()];
930        assert!(matches!(
931            r.enforce_column_filters(&sch).unwrap_err(),
932            AppError::UnknownColumn(_)
933        ));
934    }
935
936    #[test]
937    fn count_enforce_predicate_restricted_is_forbidden() {
938        let sch = schema()
939            .with_filters(exclude(&["score"]), Default::default())
940            .unwrap();
941        let c = CountRequest {
942            predicates: vec![pred("score")],
943        };
944        assert!(matches!(
945            c.enforce_column_filters(&sch).unwrap_err(),
946            AppError::Forbidden(_)
947        ));
948    }
949}