Skip to main content

modkit_db/odata/
core.rs

1//! `OData` (filters) → `sea_orm::Condition` compiler (AST in, SQL out).
2//! Parsing belongs to API/gateway. This module only consumes `modkit_odata::ast::Expr`.
3
4use std::collections::HashMap;
5
6use bigdecimal::{BigDecimal, ToPrimitive};
7use chrono::{NaiveDate, NaiveTime, Utc};
8use modkit_odata::{CursorV1, Error as ODataError, ODataOrderBy, ODataQuery, SortDir, ast as core};
9use rust_decimal::Decimal;
10use sea_orm::{
11    ColumnTrait, Condition, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
12    sea_query::{Expr, Order},
13};
14use thiserror::Error;
15
16use modkit_odata::filter::FieldKind;
17
18use crate::odata::LimitCfg;
19use crate::secure::{DBRunner, DBRunnerInternal, SeaOrmRunner};
20
21/// Type alias for cursor extraction function to reduce type complexity
22type CursorExtractor<E> = fn(&<E as EntityTrait>::Model) -> String;
23
24#[derive(Clone)]
25pub struct Field<E: EntityTrait> {
26    pub col: E::Column,
27    pub kind: FieldKind,
28    pub to_string_for_cursor: Option<CursorExtractor<E>>,
29}
30
31#[derive(Clone)]
32#[must_use]
33pub struct FieldMap<E: EntityTrait> {
34    map: HashMap<String, Field<E>>,
35}
36
37impl<E: EntityTrait> Default for FieldMap<E> {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl<E: EntityTrait> FieldMap<E> {
44    pub fn new() -> Self {
45        Self {
46            map: HashMap::new(),
47        }
48    }
49    pub fn insert(mut self, api_name: impl Into<String>, col: E::Column, kind: FieldKind) -> Self {
50        self.map.insert(
51            api_name.into().to_lowercase(),
52            Field {
53                col,
54                kind,
55                to_string_for_cursor: None,
56            },
57        );
58        self
59    }
60
61    pub fn insert_with_extractor(
62        mut self,
63        api_name: impl Into<String>,
64        col: E::Column,
65        kind: FieldKind,
66        to_string_for_cursor: CursorExtractor<E>,
67    ) -> Self {
68        self.map.insert(
69            api_name.into().to_lowercase(),
70            Field {
71                col,
72                kind,
73                to_string_for_cursor: Some(to_string_for_cursor),
74            },
75        );
76        self
77    }
78
79    pub fn encode_model_key(&self, model: &E::Model, field_name: &str) -> Option<String> {
80        let f = self.get(field_name)?;
81        f.to_string_for_cursor.map(|f| f(model))
82    }
83    #[must_use]
84    pub fn get(&self, name: &str) -> Option<&Field<E>> {
85        self.map.get(&name.to_lowercase())
86    }
87}
88
89#[derive(Debug, Error, Clone)]
90pub enum ODataBuildError {
91    #[error("unknown field: {0}")]
92    UnknownField(String),
93
94    #[error("type mismatch: expected {expected:?}, got {got}")]
95    TypeMismatch {
96        expected: FieldKind,
97        got: &'static str,
98    },
99
100    #[error("unsupported operator: {0:?}")]
101    UnsupportedOp(core::CompareOperator),
102
103    #[error("unsupported function or args: {0}()")]
104    UnsupportedFn(String),
105
106    #[error("IN() list supports only literals")]
107    NonLiteralInList,
108
109    #[error("bare identifier not allowed: {0}")]
110    BareIdentifier(String),
111
112    #[error("bare literal not allowed")]
113    BareLiteral,
114
115    #[error("{0}")]
116    Other(&'static str),
117}
118pub type ODataBuildResult<T> = Result<T, ODataBuildError>;
119
120/* ---------- coercion helpers ---------- */
121
122fn bigdecimal_to_decimal(bd: &BigDecimal) -> ODataBuildResult<Decimal> {
123    // Robust conversion: preserve precision via string.
124    let s = bd.normalized().to_string();
125    Decimal::from_str_exact(&s)
126        .or_else(|_| s.parse::<Decimal>())
127        .map_err(|_| ODataBuildError::Other("invalid decimal"))
128}
129
130fn coerce(kind: FieldKind, v: &core::Value) -> ODataBuildResult<sea_orm::Value> {
131    use core::Value as V;
132    Ok(match (kind, v) {
133        (FieldKind::String, V::String(s)) => sea_orm::Value::String(Some(Box::new(s.clone()))),
134
135        (FieldKind::I64, V::Number(n)) => {
136            let i = n.to_i64().ok_or(ODataBuildError::TypeMismatch {
137                expected: FieldKind::I64,
138                got: "number",
139            })?;
140            sea_orm::Value::BigInt(Some(i))
141        }
142
143        (FieldKind::F64, V::Number(n)) => {
144            let f = n.to_f64().ok_or(ODataBuildError::TypeMismatch {
145                expected: FieldKind::F64,
146                got: "number",
147            })?;
148            sea_orm::Value::Double(Some(f))
149        }
150
151        // Box the Decimal
152        (FieldKind::Decimal, V::Number(n)) => {
153            sea_orm::Value::Decimal(Some(Box::new(bigdecimal_to_decimal(n)?)))
154        }
155
156        (FieldKind::Bool, V::Bool(b)) => sea_orm::Value::Bool(Some(*b)),
157
158        // Box the Uuid
159        (FieldKind::Uuid, V::Uuid(u)) => sea_orm::Value::Uuid(Some(Box::new(*u))),
160
161        // Box chrono types
162        (FieldKind::DateTimeUtc, V::DateTime(dt)) => {
163            sea_orm::Value::ChronoDateTimeUtc(Some(Box::new(*dt)))
164        }
165        (FieldKind::Date, V::Date(d)) => sea_orm::Value::ChronoDate(Some(Box::new(*d))),
166        (FieldKind::Time, V::Time(t)) => sea_orm::Value::ChronoTime(Some(Box::new(*t))),
167
168        (expected, V::Null) => {
169            return Err(ODataBuildError::TypeMismatch {
170                expected,
171                got: "null",
172            });
173        }
174        (expected, V::String(_)) => {
175            return Err(ODataBuildError::TypeMismatch {
176                expected,
177                got: "string",
178            });
179        }
180        (expected, V::Number(_)) => {
181            return Err(ODataBuildError::TypeMismatch {
182                expected,
183                got: "number",
184            });
185        }
186        (expected, V::Bool(_)) => {
187            return Err(ODataBuildError::TypeMismatch {
188                expected,
189                got: "bool",
190            });
191        }
192        (expected, V::Uuid(_)) => {
193            return Err(ODataBuildError::TypeMismatch {
194                expected,
195                got: "uuid",
196            });
197        }
198        (expected, V::DateTime(_)) => {
199            return Err(ODataBuildError::TypeMismatch {
200                expected,
201                got: "datetime",
202            });
203        }
204        (expected, V::Date(_)) => {
205            return Err(ODataBuildError::TypeMismatch {
206                expected,
207                got: "date",
208            });
209        }
210        (expected, V::Time(_)) => {
211            return Err(ODataBuildError::TypeMismatch {
212                expected,
213                got: "time",
214            });
215        }
216    })
217}
218
219fn coerce_many(kind: FieldKind, items: &[core::Expr]) -> ODataBuildResult<Vec<sea_orm::Value>> {
220    items
221        .iter()
222        .map(|e| match e {
223            core::Expr::Value(v) => coerce(kind, v),
224            _ => Err(ODataBuildError::NonLiteralInList),
225        })
226        .collect()
227}
228
229/* ---------- LIKE helpers ---------- */
230
231fn like_escape(s: &str) -> String {
232    let mut out = String::with_capacity(s.len());
233    for ch in s.chars() {
234        match ch {
235            '%' | '_' | '\\' => {
236                out.push('\\');
237                out.push(ch);
238            }
239            c => out.push(c),
240        }
241    }
242    out
243}
244fn like_contains(s: &str) -> String {
245    format!("%{}%", like_escape(s))
246}
247fn like_starts(s: &str) -> String {
248    format!("{}%", like_escape(s))
249}
250fn like_ends(s: &str) -> String {
251    format!("%{}", like_escape(s))
252}
253
254/* ---------- small guards ---------- */
255
256#[inline]
257fn ensure_string_field<E: EntityTrait>(f: &Field<E>, _field_name: &str) -> ODataBuildResult<()> {
258    if f.kind != FieldKind::String {
259        return Err(ODataBuildError::TypeMismatch {
260            expected: FieldKind::String,
261            got: "non-string field",
262        });
263    }
264    Ok(())
265}
266
267/* ---------- cursor value encoding/decoding ---------- */
268
269/// Parse a cursor value from string based on field kind
270pub fn parse_cursor_value(kind: FieldKind, s: &str) -> ODataBuildResult<sea_orm::Value> {
271    use sea_orm::Value as V;
272
273    let result = match kind {
274        FieldKind::String => V::String(Some(Box::new(s.to_owned()))),
275        FieldKind::I64 => {
276            let i = s
277                .parse::<i64>()
278                .map_err(|_| ODataBuildError::Other("invalid i64 in cursor"))?;
279            V::BigInt(Some(i))
280        }
281        FieldKind::F64 => {
282            let f = s
283                .parse::<f64>()
284                .map_err(|_| ODataBuildError::Other("invalid f64 in cursor"))?;
285            V::Double(Some(f))
286        }
287        FieldKind::Bool => {
288            let b = s
289                .parse::<bool>()
290                .map_err(|_| ODataBuildError::Other("invalid bool in cursor"))?;
291            V::Bool(Some(b))
292        }
293        FieldKind::Uuid => {
294            let u = s
295                .parse::<uuid::Uuid>()
296                .map_err(|_| ODataBuildError::Other("invalid uuid in cursor"))?;
297            V::Uuid(Some(Box::new(u)))
298        }
299        FieldKind::DateTimeUtc => {
300            let dt = chrono::DateTime::parse_from_rfc3339(s)
301                .map_err(|_| ODataBuildError::Other("invalid datetime in cursor"))?
302                .with_timezone(&Utc);
303            V::ChronoDateTimeUtc(Some(Box::new(dt)))
304        }
305        FieldKind::Date => {
306            let d = s
307                .parse::<NaiveDate>()
308                .map_err(|_| ODataBuildError::Other("invalid date in cursor"))?;
309            V::ChronoDate(Some(Box::new(d)))
310        }
311        FieldKind::Time => {
312            let t = s
313                .parse::<NaiveTime>()
314                .map_err(|_| ODataBuildError::Other("invalid time in cursor"))?;
315            V::ChronoTime(Some(Box::new(t)))
316        }
317        FieldKind::Decimal => {
318            let d = s
319                .parse::<Decimal>()
320                .map_err(|_| ODataBuildError::Other("invalid decimal in cursor"))?;
321            V::Decimal(Some(Box::new(d)))
322        }
323    };
324
325    Ok(result)
326}
327
328/* ---------- cursor predicate building ---------- */
329
330/// Build a cursor predicate for pagination.
331/// This builds the lexicographic OR-chain condition for cursor-based pagination.
332///
333/// For backward pagination (cursor.d == "bwd"), the comparison operators are reversed
334/// to fetch items before the cursor, but the order remains the same for display consistency.
335///
336/// # Errors
337/// Returns `ODataBuildError` if cursor keys don't match order fields or field resolution fails.
338pub fn build_cursor_predicate<E: EntityTrait>(
339    cursor: &CursorV1,
340    order: &ODataOrderBy,
341    fmap: &FieldMap<E>,
342) -> ODataBuildResult<Condition>
343where
344    E::Column: ColumnTrait + Copy,
345{
346    if cursor.k.len() != order.0.len() {
347        return Err(ODataBuildError::Other(
348            "cursor keys count mismatch with order fields",
349        ));
350    }
351
352    // Parse cursor values
353    let mut cursor_values = Vec::new();
354    for (i, key_str) in cursor.k.iter().enumerate() {
355        let order_key = &order.0[i];
356        let field = fmap
357            .get(&order_key.field)
358            .ok_or_else(|| ODataBuildError::UnknownField(order_key.field.clone()))?;
359        let value = parse_cursor_value(field.kind, key_str)?;
360        cursor_values.push((field, value, order_key.dir));
361    }
362
363    // Determine if we're going backward
364    let is_backward = cursor.d == "bwd";
365
366    // Build lexicographic condition
367    // Forward (fwd):
368    //   For ASC: (k0 > v0) OR (k0 = v0 AND k1 > v1) OR ...
369    //   For DESC: (k0 < v0) OR (k0 = v0 AND k1 < v1) OR ...
370    // Backward (bwd): Reverse the comparisons
371    //   For ASC: (k0 < v0) OR (k0 = v0 AND k1 < v1) OR ...
372    //   For DESC: (k0 > v0) OR (k0 = v0 AND k1 > v1) OR ...
373    let mut main_condition = Condition::any();
374
375    for i in 0..cursor_values.len() {
376        let mut prefix_condition = Condition::all();
377
378        // Add equality conditions for all previous fields
379        for (field, value, _) in cursor_values.iter().take(i) {
380            prefix_condition = prefix_condition.add(Expr::col(field.col).eq(value.clone()));
381        }
382
383        // Add the comparison condition for current field
384        let (field, value, dir) = &cursor_values[i];
385        let comparison = if is_backward {
386            // Backward: reverse the comparison
387            match dir {
388                SortDir::Asc => Expr::col(field.col).lt(value.clone()),
389                SortDir::Desc => Expr::col(field.col).gt(value.clone()),
390            }
391        } else {
392            // Forward: normal comparison
393            match dir {
394                SortDir::Asc => Expr::col(field.col).gt(value.clone()),
395                SortDir::Desc => Expr::col(field.col).lt(value.clone()),
396            }
397        };
398        prefix_condition = prefix_condition.add(comparison);
399
400        main_condition = main_condition.add(prefix_condition);
401    }
402
403    Ok(main_condition)
404}
405
406/* ---------- error mapping helpers ---------- */
407
408/// Resolve a field by name, converting `UnknownField` errors to `InvalidOrderByField`
409fn resolve_field<'a, E: EntityTrait>(
410    fld_map: &'a FieldMap<E>,
411    name: &str,
412) -> Result<&'a Field<E>, ODataError> {
413    fld_map
414        .get(name)
415        .ok_or_else(|| ODataError::InvalidOrderByField(name.to_owned()))
416}
417
418/* ---------- tiebreaker handling ---------- */
419
420/// Ensure a tiebreaker field is present in the order
421pub fn ensure_tiebreaker(order: ODataOrderBy, tiebreaker: &str, dir: SortDir) -> ODataOrderBy {
422    order.ensure_tiebreaker(tiebreaker, dir)
423}
424
425/* ---------- cursor building ---------- */
426
427/// Build a cursor from a model using the effective order and field map extractors.
428///
429/// # Errors
430/// Returns `ODataError::InvalidOrderByField` if a field cannot be encoded.
431pub fn build_cursor_for_model<E: EntityTrait>(
432    model: &E::Model,
433    order: &ODataOrderBy,
434    fmap: &FieldMap<E>,
435    primary_dir: SortDir,
436    filter_hash: Option<String>,
437    direction: &str, // "fwd" or "bwd"
438) -> Result<CursorV1, ODataError> {
439    let mut k = Vec::with_capacity(order.0.len());
440    for key in &order.0 {
441        let s = fmap
442            .encode_model_key(model, &key.field)
443            .ok_or_else(|| ODataError::InvalidOrderByField(key.field.clone()))?;
444        k.push(s);
445    }
446    Ok(CursorV1 {
447        k,
448        o: primary_dir,
449        s: order.to_signed_tokens(),
450        f: filter_hash,
451        d: direction.to_owned(),
452    })
453}
454
455/* ---------- Expr (AST) -> Condition ---------- */
456
457/// Convert an `OData` filter expression AST to a `SeaORM` Condition.
458///
459/// # Errors
460/// Returns `ODataBuildError` if the expression contains unknown fields or unsupported operations.
461pub fn expr_to_condition<E: EntityTrait>(
462    expr: &core::Expr,
463    fmap: &FieldMap<E>,
464) -> ODataBuildResult<Condition>
465where
466    E::Column: ColumnTrait + Copy,
467{
468    use core::CompareOperator as Op;
469    use core::Expr as X;
470
471    Ok(match expr {
472        X::And(a, b) => {
473            let left = expr_to_condition::<E>(a, fmap)?;
474            let right = expr_to_condition::<E>(b, fmap)?;
475            Condition::all().add(left).add(right) // AND
476        }
477        X::Or(a, b) => {
478            let left = expr_to_condition::<E>(a, fmap)?;
479            let right = expr_to_condition::<E>(b, fmap)?;
480            Condition::any().add(left).add(right) // OR
481        }
482        X::Not(x) => {
483            let inner = expr_to_condition::<E>(x, fmap)?;
484            Condition::all().add(inner).not()
485        }
486
487        // Identifier op Value
488        X::Compare(lhs, op, rhs) => {
489            let (name, rhs_val) = match (&**lhs, &**rhs) {
490                (X::Identifier(name), X::Value(val)) => (name, val),
491                (X::Identifier(_), X::Identifier(_)) => {
492                    return Err(ODataBuildError::Other(
493                        "field-to-field comparison is not supported",
494                    ));
495                }
496                _ => return Err(ODataBuildError::Other("unsupported comparison form")),
497            };
498            let field = fmap
499                .get(name)
500                .ok_or_else(|| ODataBuildError::UnknownField(name.clone()))?;
501            let col = field.col;
502
503            // null handling
504            if matches!(rhs_val, core::Value::Null) {
505                return Ok(match op {
506                    Op::Eq => Condition::all().add(Expr::col(col).is_null()),
507                    Op::Ne => Condition::all().add(Expr::col(col).is_not_null()),
508                    _ => return Err(ODataBuildError::UnsupportedOp(*op)),
509                });
510            }
511
512            let value = coerce(field.kind, rhs_val)?;
513            let expr = match op {
514                Op::Eq => Expr::col(col).eq(value),
515                Op::Ne => Expr::col(col).ne(value),
516                Op::Gt => Expr::col(col).gt(value),
517                Op::Ge => Expr::col(col).gte(value),
518                Op::Lt => Expr::col(col).lt(value),
519                Op::Le => Expr::col(col).lte(value),
520            };
521            Condition::all().add(expr)
522        }
523
524        // Identifier IN (value, value, ...)
525        X::In(l, list) => {
526            let X::Identifier(name) = &**l else {
527                return Err(ODataBuildError::Other("left side of IN must be a field"));
528            };
529            let f = fmap
530                .get(name)
531                .ok_or_else(|| ODataBuildError::UnknownField(name.clone()))?;
532            let col = f.col;
533            let vals = coerce_many(f.kind, list)?;
534            if vals.is_empty() {
535                // IN () → always false
536                Condition::all().add(Expr::value(1).eq(0))
537            } else {
538                Condition::all().add(Expr::col(col).is_in(vals))
539            }
540        }
541
542        // Supported functions: contains/startswith/endswith
543        X::Function(fname, args) => {
544            let n = fname.to_ascii_lowercase();
545            match (n.as_str(), args.as_slice()) {
546                ("contains", [X::Identifier(name), X::Value(core::Value::String(s))]) => {
547                    let f = fmap
548                        .get(name)
549                        .ok_or_else(|| ODataBuildError::UnknownField(name.clone()))?;
550                    ensure_string_field(f, name)?;
551                    Condition::all().add(Expr::col(f.col).like(like_contains(s)))
552                }
553                ("startswith", [X::Identifier(name), X::Value(core::Value::String(s))]) => {
554                    let f = fmap
555                        .get(name)
556                        .ok_or_else(|| ODataBuildError::UnknownField(name.clone()))?;
557                    ensure_string_field(f, name)?;
558                    Condition::all().add(Expr::col(f.col).like(like_starts(s)))
559                }
560                ("endswith", [X::Identifier(name), X::Value(core::Value::String(s))]) => {
561                    let f = fmap
562                        .get(name)
563                        .ok_or_else(|| ODataBuildError::UnknownField(name.clone()))?;
564                    ensure_string_field(f, name)?;
565                    Condition::all().add(Expr::col(f.col).like(like_ends(s)))
566                }
567                _ => return Err(ODataBuildError::UnsupportedFn(fname.clone())),
568            }
569        }
570
571        // Leaf forms are not valid WHERE by themselves
572        X::Identifier(name) => return Err(ODataBuildError::BareIdentifier(name.clone())),
573        X::Value(_) => return Err(ODataBuildError::BareLiteral),
574    })
575}
576
577/// Apply an optional `OData` filter (via wrapper) to a plain `SeaORM` Select<E>.
578///
579/// This extension does NOT parse the filter string — it only consumes a parsed AST
580/// (`modkit_odata::ast::Expr`) and translates it into a `sea_orm::Condition`.
581pub trait ODataExt<E: EntityTrait>: Sized {
582    /// Apply `OData` filter to the query.
583    ///
584    /// # Errors
585    /// Returns `ODataBuildError` if the filter contains unknown fields or invalid expressions.
586    fn apply_odata_filter(
587        self,
588        od_query: ODataQuery,
589        fld_map: &FieldMap<E>,
590    ) -> ODataBuildResult<Self>;
591}
592
593impl<E> ODataExt<E> for sea_orm::Select<E>
594where
595    E: EntityTrait,
596    E::Column: ColumnTrait + Copy,
597{
598    fn apply_odata_filter(
599        self,
600        od_query: ODataQuery,
601        fld_map: &FieldMap<E>,
602    ) -> ODataBuildResult<Self> {
603        match od_query.filter() {
604            Some(ast) => {
605                let cond = expr_to_condition::<E>(ast, fld_map)?;
606                Ok(self.filter(cond))
607            }
608            None => Ok(self),
609        }
610    }
611}
612
613/// Extension trait for applying cursor-based pagination
614pub trait CursorApplyExt<E: EntityTrait>: Sized {
615    /// Apply cursor-based forward pagination.
616    ///
617    /// # Errors
618    /// Returns `ODataBuildError` if cursor validation fails.
619    fn apply_cursor_forward(
620        self,
621        cursor: &CursorV1,
622        order: &ODataOrderBy,
623        fld_map: &FieldMap<E>,
624    ) -> ODataBuildResult<Self>;
625}
626
627impl<E> CursorApplyExt<E> for sea_orm::Select<E>
628where
629    E: EntityTrait,
630    E::Column: ColumnTrait + Copy,
631{
632    fn apply_cursor_forward(
633        self,
634        cursor: &CursorV1,
635        order: &ODataOrderBy,
636        fld_map: &FieldMap<E>,
637    ) -> ODataBuildResult<Self> {
638        let cond = build_cursor_predicate(cursor, order, fld_map)?;
639        Ok(self.filter(cond))
640    }
641}
642
643/// Extension trait for applying ordering (legacy version with `ODataBuildError`)
644pub trait ODataOrderExt<E: EntityTrait>: Sized {
645    /// Apply `OData` ordering to the query.
646    ///
647    /// # Errors
648    /// Returns `ODataBuildError` if an unknown field is referenced.
649    fn apply_odata_order(
650        self,
651        order: &ODataOrderBy,
652        fld_map: &FieldMap<E>,
653    ) -> ODataBuildResult<Self>;
654}
655
656impl<E> ODataOrderExt<E> for sea_orm::Select<E>
657where
658    E: EntityTrait,
659    E::Column: ColumnTrait + Copy,
660{
661    fn apply_odata_order(
662        self,
663        order: &ODataOrderBy,
664        fld_map: &FieldMap<E>,
665    ) -> ODataBuildResult<Self> {
666        let mut query = self;
667
668        for order_key in &order.0 {
669            let field = fld_map
670                .get(&order_key.field)
671                .ok_or_else(|| ODataBuildError::UnknownField(order_key.field.clone()))?;
672
673            let sea_order = match order_key.dir {
674                SortDir::Asc => Order::Asc,
675                SortDir::Desc => Order::Desc,
676            };
677
678            query = query.order_by(field.col, sea_order);
679        }
680
681        Ok(query)
682    }
683}
684
685/// Extension trait for applying ordering with centralized error handling
686pub trait ODataOrderPageExt<E: EntityTrait>: Sized {
687    /// Apply `OData` ordering with page-level error handling.
688    ///
689    /// # Errors
690    /// Returns `ODataError` if an unknown field is referenced.
691    fn apply_odata_order_page(
692        self,
693        order: &ODataOrderBy,
694        fld_map: &FieldMap<E>,
695    ) -> Result<Self, ODataError>;
696}
697
698impl<E> ODataOrderPageExt<E> for sea_orm::Select<E>
699where
700    E: EntityTrait,
701    E::Column: ColumnTrait + Copy,
702{
703    fn apply_odata_order_page(
704        self,
705        order: &ODataOrderBy,
706        fld_map: &FieldMap<E>,
707    ) -> Result<Self, ODataError> {
708        let mut query = self;
709
710        for order_key in &order.0 {
711            let field = resolve_field(fld_map, &order_key.field)?;
712
713            let sea_order = match order_key.dir {
714                SortDir::Asc => Order::Asc,
715                SortDir::Desc => Order::Desc,
716            };
717
718            query = query.order_by(field.col, sea_order);
719        }
720
721        Ok(query)
722    }
723}
724
725/// Extension trait for applying full `OData` query (filter + cursor + order)
726pub trait ODataQueryExt<E: EntityTrait>: Sized {
727    /// Apply full `OData` query including filter, cursor, and ordering.
728    ///
729    /// # Errors
730    /// Returns `ODataBuildError` if any part of the query application fails.
731    fn apply_odata_query(
732        self,
733        query: &ODataQuery,
734        fld_map: &FieldMap<E>,
735        tiebreaker: (&str, SortDir),
736    ) -> ODataBuildResult<Self>;
737}
738
739impl<E> ODataQueryExt<E> for sea_orm::Select<E>
740where
741    E: EntityTrait,
742    E::Column: ColumnTrait + Copy,
743{
744    fn apply_odata_query(
745        self,
746        query: &ODataQuery,
747        fld_map: &FieldMap<E>,
748        tiebreaker: (&str, SortDir),
749    ) -> ODataBuildResult<Self> {
750        let mut select = self;
751
752        if let Some(ast) = query.filter.as_deref() {
753            let cond = expr_to_condition::<E>(ast, fld_map)?;
754            select = select.filter(cond);
755        }
756
757        let effective_order = ensure_tiebreaker(query.order.clone(), tiebreaker.0, tiebreaker.1);
758
759        if let Some(cursor) = &query.cursor {
760            select = select.apply_cursor_forward(cursor, &effective_order, fld_map)?;
761        }
762
763        select = select.apply_odata_order(&effective_order, fld_map)?;
764
765        Ok(select)
766    }
767}
768
769/* ---------- pagination combiner ---------- */
770
771// Use unified pagination types from modkit-odata
772pub use modkit_odata::{Page, PageInfo};
773
774// Note: LimitCfg is imported at the top and re-exported from odata/mod.rs
775
776fn clamp_limit(req: Option<u64>, cfg: LimitCfg) -> u64 {
777    let mut l = req.unwrap_or(cfg.default);
778    if l == 0 {
779        l = 1;
780    }
781    if l > cfg.max {
782        l = cfg.max;
783    }
784    l
785}
786
787/// One-shot pagination combiner that handles filter → cursor predicate → order → overfetch/trim → build cursors.
788///
789/// # Errors
790/// Returns `ODataError` if filter application, cursor validation, or database query fails.
791pub async fn paginate_with_odata<E, D, F, C>(
792    select: sea_orm::Select<E>,
793    conn: &C,
794    q: &ODataQuery,
795    fmap: &FieldMap<E>,
796    tiebreaker: (&str, SortDir), // e.g. ("id", SortDir::Desc)
797    limit_cfg: LimitCfg,         // e.g. { default: 25, max: 1000 }
798    model_to_domain: F,
799) -> Result<Page<D>, ODataError>
800where
801    E: EntityTrait,
802    E::Column: ColumnTrait + Copy,
803    F: Fn(E::Model) -> D + Copy,
804    C: DBRunner,
805{
806    let limit = clamp_limit(q.limit, limit_cfg);
807    let fetch = limit + 1;
808
809    // Effective order derivation based on new policy
810    let effective_order = if let Some(cur) = &q.cursor {
811        // Derive order from the cursor's signed tokens
812        modkit_odata::ODataOrderBy::from_signed_tokens(&cur.s)
813            .map_err(|_| ODataError::InvalidCursor)?
814    } else {
815        // Use client order; ensure tiebreaker
816        q.order
817            .clone()
818            .ensure_tiebreaker(tiebreaker.0, tiebreaker.1)
819    };
820
821    // Validate cursor consistency (filter hash only) if cursor present
822    if let Some(cur) = &q.cursor
823        && let (Some(h), Some(cf)) = (q.filter_hash.as_deref(), cur.f.as_deref())
824        && h != cf
825    {
826        return Err(ODataError::FilterMismatch);
827    }
828
829    // Compose: filter → cursor predicate → order; apply limit+1 at the end
830    let mut s = select;
831
832    // Apply filter
833    if let Some(ast) = q.filter.as_deref() {
834        s = s.filter(
835            expr_to_condition::<E>(ast, fmap)
836                .map_err(|e| ODataError::InvalidFilter(e.to_string()))?,
837        );
838    }
839
840    // Check if we're paginating backward
841    let is_backward = q.cursor.as_ref().is_some_and(|c| c.d == "bwd");
842
843    // Apply cursor if present
844    if let Some(cursor) = &q.cursor {
845        s = s.filter(
846            build_cursor_predicate(cursor, &effective_order, fmap)
847                .map_err(|_| ODataError::InvalidCursor)?,
848        );
849    }
850
851    // Apply order (reverse it for backward pagination)
852    let query_order = if is_backward {
853        effective_order.clone().reverse_directions()
854    } else {
855        effective_order.clone()
856    };
857    s = s.apply_odata_order_page(&query_order, fmap)?;
858
859    // Apply limit
860    s = s.limit(fetch);
861
862    #[allow(clippy::disallowed_methods)]
863    let mut rows = match DBRunnerInternal::as_seaorm(conn) {
864        SeaOrmRunner::Conn(db) => s.all(db).await,
865        SeaOrmRunner::Tx(tx) => s.all(tx).await,
866    }
867    .map_err(|e| ODataError::Db(e.to_string()))?;
868
869    let has_more = (rows.len() as u64) > limit;
870
871    // For backward pagination with reversed ORDER BY:
872    // - DB returns items in opposite order
873    // - We fetch limit+1 to detect has_more
874    // - We need to: 1) trim, 2) reverse back to original order
875    if is_backward {
876        // Remove the extra item (furthest back in time, which is at the END after reversed query)
877        if has_more {
878            rows.pop();
879        }
880        // Reverse to restore original display order
881        rows.reverse();
882    } else if has_more {
883        // Forward pagination: just truncate the end
884        rows.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
885    }
886
887    // Build cursors
888    // After all the reversals, rows are in the display order (DESC)
889    // - rows.first() = newest item
890    // - rows.last() = oldest item
891    //
892    // For backward pagination:
893    //   - has_more means "more items backward" (older)
894    //   - next_cursor should always be present (we came from forward)
895    //   - prev_cursor based on has_more
896    // For forward pagination:
897    //   - has_more means "more items forward" (older in DESC)
898    //   - next_cursor based on has_more
899    //   - prev_cursor always present (unless at start)
900
901    let next_cursor = if is_backward {
902        // Going backward: always have items forward (unless this was the initial query)
903        // Build cursor from last item to go forward
904        build_cursor(&rows, &effective_order, fmap, tiebreaker, q, true, "fwd")?
905    } else if has_more {
906        // Going forward: only have more if has_more is true
907        build_cursor(&rows, &effective_order, fmap, tiebreaker, q, true, "fwd")?
908    } else {
909        None
910    };
911
912    let prev_cursor = if is_backward {
913        // Going backward: only have more backward if has_more is true
914        if has_more {
915            build_cursor(&rows, &effective_order, fmap, tiebreaker, q, false, "bwd")?
916        } else {
917            None
918        }
919    } else if q.cursor.is_some() {
920        // Going forward: have items backward only if this is NOT the initial query
921        // If q.cursor is None, we're at the start of the dataset
922        build_cursor(&rows, &effective_order, fmap, tiebreaker, q, false, "bwd")?
923    } else {
924        None
925    };
926
927    let items = rows.into_iter().map(model_to_domain).collect();
928
929    Ok(Page {
930        items,
931        page_info: PageInfo {
932            next_cursor,
933            prev_cursor,
934            limit,
935        },
936    })
937}
938
939fn build_cursor<E: EntityTrait>(
940    rows: &[E::Model],
941    effective_order: &ODataOrderBy,
942    fmap: &FieldMap<E>,
943    tiebreaker: (&str, SortDir),
944    q: &ODataQuery,
945    last: bool,
946    direction: &str,
947) -> Result<Option<String>, ODataError> {
948    if last { rows.last() } else { rows.first() }
949        .map(|m| {
950            build_cursor_for_model::<E>(
951                m,
952                effective_order,
953                fmap,
954                tiebreaker.1,
955                q.filter_hash.clone(),
956                direction,
957            )
958            .and_then(|c| c.encode().map_err(|_| ODataError::InvalidCursor))
959        })
960        .transpose()
961}