Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10    db::{
11        query::{
12            ReadConsistency,
13            enum_filter::normalize_enum_literals,
14            expr::{FilterExpr, SortExpr, SortLowerError},
15            plan::{
16                DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
17                OrderSpec, PageSpec, PlanError,
18                planner::{PlannerError, plan_access},
19                validate::validate_logical_plan_model,
20            },
21            policy,
22            predicate::{
23                Predicate, SchemaInfo, ValidateError, normalize,
24                validate::reject_unsupported_query_features,
25            },
26        },
27        response::ResponseError,
28    },
29    error::InternalError,
30    model::entity::EntityModel,
31    traits::{EntityKind, FieldValue, SingletonEntity},
32    value::Value,
33};
34use std::marker::PhantomData;
35use thiserror::Error as ThisError;
36
37///
38/// QueryMode
39/// Discriminates load vs delete intent at planning time.
40/// Encodes mode-specific fields so invalid states are unrepresentable.
41/// Mode checks are explicit and stable at execution time.
42///
43
44#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum QueryMode {
46    Load(LoadSpec),
47    Delete(DeleteSpec),
48}
49
50impl QueryMode {
51    /// True if this mode represents a load intent.
52    #[must_use]
53    pub const fn is_load(&self) -> bool {
54        match self {
55            Self::Load(_) => true,
56            Self::Delete(_) => false,
57        }
58    }
59
60    /// True if this mode represents a delete intent.
61    #[must_use]
62    pub const fn is_delete(&self) -> bool {
63        match self {
64            Self::Delete(_) => true,
65            Self::Load(_) => false,
66        }
67    }
68}
69
70///
71/// LoadSpec
72/// Mode-specific fields for load intents.
73/// Encodes pagination without leaking into delete intents.
74///
75
76#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct LoadSpec {
78    pub limit: Option<u32>,
79    pub offset: u32,
80}
81
82impl LoadSpec {
83    /// Create an empty load spec.
84    #[must_use]
85    pub const fn new() -> Self {
86        Self {
87            limit: None,
88            offset: 0,
89        }
90    }
91}
92
93///
94/// DeleteSpec
95/// Mode-specific fields for delete intents.
96/// Encodes delete limits without leaking into load intents.
97///
98
99#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
100pub struct DeleteSpec {
101    pub limit: Option<u32>,
102}
103
104impl DeleteSpec {
105    /// Create an empty delete spec.
106    #[must_use]
107    pub const fn new() -> Self {
108        Self { limit: None }
109    }
110}
111
112///
113/// QueryModel
114///
115/// Model-level query intent and planning context.
116/// Consumes an `EntityModel` derived from typed entity definitions.
117///
118
119#[derive(Debug)]
120pub(crate) struct QueryModel<'m, K> {
121    model: &'m EntityModel,
122    mode: QueryMode,
123    predicate: Option<Predicate>,
124    key_access: Option<KeyAccessState<K>>,
125    key_access_conflict: bool,
126    order: Option<OrderSpec>,
127    consistency: ReadConsistency,
128}
129
130impl<'m, K: FieldValue> QueryModel<'m, K> {
131    #[must_use]
132    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
133        Self {
134            model,
135            mode: QueryMode::Load(LoadSpec::new()),
136            predicate: None,
137            key_access: None,
138            key_access_conflict: false,
139            order: None,
140            consistency,
141        }
142    }
143
144    /// Return the intent mode (load vs delete).
145    #[must_use]
146    pub(crate) const fn mode(&self) -> QueryMode {
147        self.mode
148    }
149
150    #[must_use]
151    fn has_explicit_order(&self) -> bool {
152        policy::has_explicit_order(self.order.as_ref())
153    }
154
155    #[must_use]
156    const fn load_spec(&self) -> Option<LoadSpec> {
157        match self.mode {
158            QueryMode::Load(spec) => Some(spec),
159            QueryMode::Delete(_) => None,
160        }
161    }
162
163    /// Add a predicate, implicitly AND-ing with any existing predicate.
164    #[must_use]
165    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
166        self.predicate = match self.predicate.take() {
167            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
168            None => Some(predicate),
169        };
170        self
171    }
172
173    /// Apply a dynamic filter expression using the model schema.
174    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
175        let schema = SchemaInfo::from_entity_model(self.model)?;
176        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
177
178        Ok(self.filter(predicate))
179    }
180
181    /// Apply a dynamic sort expression using the model schema.
182    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
183        let schema = SchemaInfo::from_entity_model(self.model)?;
184        let order = match expr.lower_with(&schema) {
185            Ok(order) => order,
186            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
187            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
188        };
189
190        policy::validate_order_shape(Some(&order))
191            .map_err(IntentError::from)
192            .map_err(QueryError::from)?;
193
194        Ok(self.order_spec(order))
195    }
196
197    /// Append an ascending sort key.
198    #[must_use]
199    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
200        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
201        self
202    }
203
204    /// Append a descending sort key.
205    #[must_use]
206    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
207        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
208        self
209    }
210
211    /// Set a fully-specified order spec (validated before reaching this boundary).
212    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
213        self.order = Some(order);
214        self
215    }
216
217    /// Track key-only access paths and detect conflicting key intents.
218    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
219        if let Some(existing) = &self.key_access
220            && existing.kind != kind
221        {
222            self.key_access_conflict = true;
223        }
224
225        self.key_access = Some(KeyAccessState { kind, access });
226
227        self
228    }
229
230    /// Set the access path to a single primary key lookup.
231    pub(crate) fn by_id(self, id: K) -> Self {
232        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
233    }
234
235    /// Set the access path to a primary key batch lookup.
236    pub(crate) fn by_ids<I>(self, ids: I) -> Self
237    where
238        I: IntoIterator<Item = K>,
239    {
240        self.set_key_access(
241            KeyAccessKind::Many,
242            KeyAccess::Many(ids.into_iter().collect()),
243        )
244    }
245
246    /// Set the access path to the singleton primary key.
247    pub(crate) fn only(self, id: K) -> Self {
248        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
249    }
250
251    /// Mark this intent as a delete query.
252    #[must_use]
253    pub(crate) const fn delete(mut self) -> Self {
254        if self.mode.is_load() {
255            self.mode = QueryMode::Delete(DeleteSpec::new());
256        }
257        self
258    }
259
260    /// Apply a limit to the current mode.
261    ///
262    /// Load limits bound result size; delete limits bound mutation size.
263    #[must_use]
264    pub(crate) const fn limit(mut self, limit: u32) -> Self {
265        match self.mode {
266            QueryMode::Load(mut spec) => {
267                spec.limit = Some(limit);
268                self.mode = QueryMode::Load(spec);
269            }
270            QueryMode::Delete(mut spec) => {
271                spec.limit = Some(limit);
272                self.mode = QueryMode::Delete(spec);
273            }
274        }
275        self
276    }
277
278    /// Apply an offset to a load intent.
279    #[must_use]
280    pub(crate) const fn offset(mut self, offset: u32) -> Self {
281        if let QueryMode::Load(mut spec) = self.mode {
282            spec.offset = offset;
283            self.mode = QueryMode::Load(spec);
284        }
285        self
286    }
287
288    /// Build a model-level logical plan using Value-based access keys.
289    fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
290        // Phase 1: schema surface and intent validation.
291        let schema_info = SchemaInfo::from_entity_model(self.model)?;
292        self.validate_intent()?;
293
294        // Phase 2: predicate normalization and access planning.
295        let normalized_predicate = self
296            .predicate
297            .as_ref()
298            .map(|predicate| {
299                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
300                let predicate = normalize_enum_literals(&schema_info, predicate)?;
301                Ok::<Predicate, ValidateError>(normalize(&predicate))
302            })
303            .transpose()?;
304        let access_plan_value = match &self.key_access {
305            Some(state) => access_plan_from_keys_value(&state.access),
306            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
307        };
308
309        // Phase 3: assemble the executor-ready plan.
310        let plan = LogicalPlan {
311            mode: self.mode,
312            access: access_plan_value,
313            predicate: normalized_predicate,
314            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
315            // This ensures explain/fingerprint/execution share one deterministic order shape.
316            order: canonicalize_order_spec(self.model, self.order.clone()),
317            delete_limit: match self.mode {
318                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
319                QueryMode::Load(_) => None,
320            },
321            page: match self.mode {
322                QueryMode::Load(spec) => {
323                    if spec.limit.is_some() || spec.offset > 0 {
324                        Some(PageSpec {
325                            limit: spec.limit,
326                            offset: spec.offset,
327                        })
328                    } else {
329                        None
330                    }
331                }
332                QueryMode::Delete(_) => None,
333            },
334            consistency: self.consistency,
335        };
336
337        validate_logical_plan_model(&schema_info, self.model, &plan)?;
338
339        Ok(plan)
340    }
341
342    // Validate pre-plan policy invariants and key-access rules before planning.
343    fn validate_intent(&self) -> Result<(), IntentError> {
344        if self.key_access_conflict {
345            return Err(IntentError::KeyAccessConflict);
346        }
347
348        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
349            .map_err(IntentError::from)?;
350
351        if let Some(state) = &self.key_access {
352            match state.kind {
353                KeyAccessKind::Many if self.predicate.is_some() => {
354                    return Err(IntentError::ByIdsWithPredicate);
355                }
356                KeyAccessKind::Only if self.predicate.is_some() => {
357                    return Err(IntentError::OnlyWithPredicate);
358                }
359                _ => {
360                    // NOTE: Single/Many without predicates impose no additional constraints.
361                }
362            }
363        }
364
365        Ok(())
366    }
367}
368
369///
370/// Query
371///
372/// Typed, declarative query intent for a specific entity type.
373///
374/// This intent is:
375/// - schema-agnostic at construction
376/// - normalized and validated only during planning
377/// - free of access-path decisions
378///
379
380#[derive(Debug)]
381pub struct Query<E: EntityKind> {
382    intent: QueryModel<'static, E::Key>,
383    _marker: PhantomData<E>,
384}
385
386impl<E: EntityKind> Query<E> {
387    /// Create a new intent with an explicit missing-row policy.
388    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
389    /// Use Strict to surface missing rows during scan/delete execution.
390    #[must_use]
391    pub const fn new(consistency: ReadConsistency) -> Self {
392        Self {
393            intent: QueryModel::new(E::MODEL, consistency),
394            _marker: PhantomData,
395        }
396    }
397
398    /// Return the intent mode (load vs delete).
399    #[must_use]
400    pub const fn mode(&self) -> QueryMode {
401        self.intent.mode()
402    }
403
404    #[must_use]
405    pub(crate) fn has_explicit_order(&self) -> bool {
406        self.intent.has_explicit_order()
407    }
408
409    #[must_use]
410    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
411        self.intent.load_spec()
412    }
413
414    /// Add a predicate, implicitly AND-ing with any existing predicate.
415    #[must_use]
416    pub fn filter(mut self, predicate: Predicate) -> Self {
417        self.intent = self.intent.filter(predicate);
418        self
419    }
420
421    /// Apply a dynamic filter expression.
422    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
423        let Self { intent, _marker } = self;
424        let intent = intent.filter_expr(expr)?;
425
426        Ok(Self { intent, _marker })
427    }
428
429    /// Apply a dynamic sort expression.
430    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
431        let Self { intent, _marker } = self;
432        let intent = intent.sort_expr(expr)?;
433
434        Ok(Self { intent, _marker })
435    }
436
437    /// Append an ascending sort key.
438    #[must_use]
439    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
440        self.intent = self.intent.order_by(field);
441        self
442    }
443
444    /// Append a descending sort key.
445    #[must_use]
446    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
447        self.intent = self.intent.order_by_desc(field);
448        self
449    }
450
451    /// Set the access path to a single primary key lookup.
452    pub(crate) fn by_id(self, id: E::Key) -> Self {
453        let Self { intent, _marker } = self;
454        Self {
455            intent: intent.by_id(id),
456            _marker,
457        }
458    }
459
460    /// Set the access path to a primary key batch lookup.
461    pub(crate) fn by_ids<I>(self, ids: I) -> Self
462    where
463        I: IntoIterator<Item = E::Key>,
464    {
465        let Self { intent, _marker } = self;
466        Self {
467            intent: intent.by_ids(ids),
468            _marker,
469        }
470    }
471
472    /// Mark this intent as a delete query.
473    #[must_use]
474    pub fn delete(mut self) -> Self {
475        self.intent = self.intent.delete();
476        self
477    }
478
479    /// Apply a limit to the current mode.
480    ///
481    /// Load limits bound result size; delete limits bound mutation size.
482    /// For load queries, any use of `limit` or `offset` requires an explicit
483    /// `order_by(...)` so pagination is deterministic.
484    #[must_use]
485    pub fn limit(mut self, limit: u32) -> Self {
486        self.intent = self.intent.limit(limit);
487        self
488    }
489
490    /// Apply an offset to a load intent.
491    ///
492    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
493    #[must_use]
494    pub fn offset(mut self, offset: u32) -> Self {
495        self.intent = self.intent.offset(offset);
496        self
497    }
498
499    /// Explain this intent without executing it.
500    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
501        let plan = self.build_plan()?;
502
503        Ok(plan.explain_with_model(E::MODEL))
504    }
505
506    /// Plan this intent into an executor-ready plan.
507    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
508        let plan = self.build_plan()?;
509
510        Ok(ExecutablePlan::new(plan))
511    }
512
513    // Build a logical plan for the current intent.
514    fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
515        let plan_value = self.intent.build_plan_model()?;
516        let LogicalPlan {
517            mode,
518            access,
519            predicate,
520            order,
521            delete_limit,
522            page,
523            consistency,
524        } = plan_value;
525
526        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
527        let plan = LogicalPlan {
528            mode,
529            access,
530            predicate,
531            order,
532            delete_limit,
533            page,
534            consistency,
535        };
536
537        Ok(plan)
538    }
539}
540
541impl<E> Query<E>
542where
543    E: EntityKind + SingletonEntity,
544    E::Key: Default,
545{
546    /// Set the access path to the singleton primary key.
547    pub(crate) fn only(self) -> Self {
548        let Self { intent, _marker } = self;
549
550        Self {
551            intent: intent.only(E::Key::default()),
552            _marker,
553        }
554    }
555}
556
557///
558/// QueryError
559///
560
561#[derive(Debug, ThisError)]
562pub enum QueryError {
563    #[error("{0}")]
564    Validate(#[from] ValidateError),
565
566    #[error("{0}")]
567    Plan(Box<PlanError>),
568
569    #[error("{0}")]
570    Intent(#[from] IntentError),
571
572    #[error("{0}")]
573    Response(#[from] ResponseError),
574
575    #[error("{0}")]
576    Execute(#[from] InternalError),
577}
578
579impl From<PlannerError> for QueryError {
580    fn from(err: PlannerError) -> Self {
581        match err {
582            PlannerError::Plan(err) => Self::from(*err),
583            PlannerError::Internal(err) => Self::Execute(*err),
584        }
585    }
586}
587
588impl From<PlanError> for QueryError {
589    fn from(err: PlanError) -> Self {
590        Self::Plan(Box::new(err))
591    }
592}
593
594///
595/// IntentError
596///
597
598#[derive(Clone, Copy, Debug, ThisError)]
599pub enum IntentError {
600    #[error("{0}")]
601    PlanShape(#[from] policy::PlanPolicyError),
602
603    #[error("by_ids() cannot be combined with predicates")]
604    ByIdsWithPredicate,
605
606    #[error("only() cannot be combined with predicates")]
607    OnlyWithPredicate,
608
609    #[error("multiple key access methods were used on the same query")]
610    KeyAccessConflict,
611
612    #[error("cursor pagination requires an explicit ordering")]
613    CursorRequiresOrder,
614
615    #[error("cursor pagination requires an explicit limit")]
616    CursorRequiresLimit,
617
618    #[error("cursor pagination does not support offset; use the cursor token for continuation")]
619    CursorWithOffsetUnsupported,
620}
621
622impl From<policy::CursorPagingPolicyError> for IntentError {
623    fn from(err: policy::CursorPagingPolicyError) -> Self {
624        match err {
625            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
626            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
627            policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
628                Self::CursorWithOffsetUnsupported
629            }
630        }
631    }
632}
633
634/// Helper to append an ordering field while preserving existing order spec.
635fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
636    match order {
637        Some(mut spec) => {
638            spec.fields.push((field.to_string(), direction));
639            spec
640        }
641        None => OrderSpec {
642            fields: vec![(field.to_string(), direction)],
643        },
644    }
645}
646
647// Normalize ORDER BY into a canonical, deterministic shape:
648// - preserve user field order
649// - remove explicit primary-key references from the user segment
650// - append exactly one primary-key field as the terminal tie-break
651fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
652    let mut order = order?;
653    if order.fields.is_empty() {
654        return Some(order);
655    }
656
657    let pk_field = model.primary_key.name;
658    let mut pk_direction = None;
659    order.fields.retain(|(field, direction)| {
660        if field == pk_field {
661            if pk_direction.is_none() {
662                pk_direction = Some(*direction);
663            }
664            false
665        } else {
666            true
667        }
668    });
669
670    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
671    order.fields.push((pk_field.to_string(), pk_direction));
672
673    Some(order)
674}