Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use crate::db::intent::{DeleteSpec, LoadSpec, QueryMode};
8pub(crate) use key_access::*;
9
10use crate::{
11    db::{
12        consistency::ReadConsistency,
13        executor::ExecutablePlan,
14        policy,
15        query::{
16            explain::ExplainPlan,
17            expr::{FilterExpr, SortExpr, SortLowerError},
18            plan::{
19                AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
20                PageSpec, PlanError,
21                planner::{PlannerError, plan_access},
22                validate::validate_logical_plan_model,
23            },
24            predicate::{
25                Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
26                validate::reject_unsupported_query_features,
27            },
28        },
29        response::ResponseError,
30    },
31    error::InternalError,
32    model::entity::EntityModel,
33    traits::{EntityKind, FieldValue, SingletonEntity},
34    value::Value,
35};
36use std::marker::PhantomData;
37use thiserror::Error as ThisError;
38
39///
40/// QueryModel
41///
42/// Model-level query intent and planning context.
43/// Consumes an `EntityModel` derived from typed entity definitions.
44///
45
46#[derive(Debug)]
47pub(crate) struct QueryModel<'m, K> {
48    model: &'m EntityModel,
49    mode: QueryMode,
50    predicate: Option<Predicate>,
51    key_access: Option<KeyAccessState<K>>,
52    key_access_conflict: bool,
53    order: Option<OrderSpec>,
54    distinct: bool,
55    consistency: ReadConsistency,
56}
57
58impl<'m, K: FieldValue> QueryModel<'m, K> {
59    #[must_use]
60    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
61        Self {
62            model,
63            mode: QueryMode::Load(LoadSpec::new()),
64            predicate: None,
65            key_access: None,
66            key_access_conflict: false,
67            order: None,
68            distinct: false,
69            consistency,
70        }
71    }
72
73    /// Return the intent mode (load vs delete).
74    #[must_use]
75    pub(crate) const fn mode(&self) -> QueryMode {
76        self.mode
77    }
78
79    #[must_use]
80    fn has_explicit_order(&self) -> bool {
81        policy::has_explicit_order(self.order.as_ref())
82    }
83
84    #[must_use]
85    const fn load_spec(&self) -> Option<LoadSpec> {
86        match self.mode {
87            QueryMode::Load(spec) => Some(spec),
88            QueryMode::Delete(_) => None,
89        }
90    }
91
92    /// Add a predicate, implicitly AND-ing with any existing predicate.
93    #[must_use]
94    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
95        self.predicate = match self.predicate.take() {
96            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
97            None => Some(predicate),
98        };
99        self
100    }
101
102    /// Apply a dynamic filter expression using the model schema.
103    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
104        let schema = SchemaInfo::from_entity_model(self.model)?;
105        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
106
107        Ok(self.filter(predicate))
108    }
109
110    /// Apply a dynamic sort expression using the model schema.
111    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
112        let schema = SchemaInfo::from_entity_model(self.model)?;
113        let order = match expr.lower_with(&schema) {
114            Ok(order) => order,
115            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
116            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
117        };
118
119        policy::validate_order_shape(Some(&order))
120            .map_err(IntentError::from)
121            .map_err(QueryError::from)?;
122
123        Ok(self.order_spec(order))
124    }
125
126    /// Append an ascending sort key.
127    #[must_use]
128    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
129        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
130        self
131    }
132
133    /// Append a descending sort key.
134    #[must_use]
135    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
136        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
137        self
138    }
139
140    /// Set a fully-specified order spec (validated before reaching this boundary).
141    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
142        self.order = Some(order);
143        self
144    }
145
146    /// Enable DISTINCT semantics for this query intent.
147    #[must_use]
148    pub(crate) const fn distinct(mut self) -> Self {
149        self.distinct = true;
150        self
151    }
152
153    /// Track key-only access paths and detect conflicting key intents.
154    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
155        if let Some(existing) = &self.key_access
156            && existing.kind != kind
157        {
158            self.key_access_conflict = true;
159        }
160
161        self.key_access = Some(KeyAccessState { kind, access });
162
163        self
164    }
165
166    /// Set the access path to a single primary key lookup.
167    pub(crate) fn by_id(self, id: K) -> Self {
168        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
169    }
170
171    /// Set the access path to a primary key batch lookup.
172    pub(crate) fn by_ids<I>(self, ids: I) -> Self
173    where
174        I: IntoIterator<Item = K>,
175    {
176        self.set_key_access(
177            KeyAccessKind::Many,
178            KeyAccess::Many(ids.into_iter().collect()),
179        )
180    }
181
182    /// Set the access path to the singleton primary key.
183    pub(crate) fn only(self, id: K) -> Self {
184        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
185    }
186
187    /// Mark this intent as a delete query.
188    #[must_use]
189    pub(crate) const fn delete(mut self) -> Self {
190        if self.mode.is_load() {
191            self.mode = QueryMode::Delete(DeleteSpec::new());
192        }
193        self
194    }
195
196    /// Apply a limit to the current mode.
197    ///
198    /// Load limits bound result size; delete limits bound mutation size.
199    #[must_use]
200    pub(crate) const fn limit(mut self, limit: u32) -> Self {
201        match self.mode {
202            QueryMode::Load(mut spec) => {
203                spec.limit = Some(limit);
204                self.mode = QueryMode::Load(spec);
205            }
206            QueryMode::Delete(mut spec) => {
207                spec.limit = Some(limit);
208                self.mode = QueryMode::Delete(spec);
209            }
210        }
211        self
212    }
213
214    /// Apply an offset to a load intent.
215    #[must_use]
216    pub(crate) const fn offset(mut self, offset: u32) -> Self {
217        if let QueryMode::Load(mut spec) = self.mode {
218            spec.offset = offset;
219            self.mode = QueryMode::Load(spec);
220        }
221        self
222    }
223
224    /// Build a model-level logical plan using Value-based access keys.
225    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
226        // Phase 1: schema surface and intent validation.
227        let schema_info = SchemaInfo::from_entity_model(self.model)?;
228        self.validate_intent()?;
229
230        // Phase 2: predicate normalization and access planning.
231        let normalized_predicate = self
232            .predicate
233            .as_ref()
234            .map(|predicate| {
235                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
236                let predicate = normalize_enum_literals(&schema_info, predicate)?;
237                Ok::<Predicate, ValidateError>(normalize(&predicate))
238            })
239            .transpose()?;
240        let access_plan_value = match &self.key_access {
241            Some(state) => access_plan_from_keys_value(&state.access),
242            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
243        };
244
245        // Phase 3: assemble the executor-ready plan.
246        let logical = LogicalPlan {
247            mode: self.mode,
248            predicate: normalized_predicate,
249            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
250            // This ensures explain/fingerprint/execution share one deterministic order shape.
251            order: canonicalize_order_spec(self.model, self.order.clone()),
252            distinct: self.distinct,
253            delete_limit: match self.mode {
254                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
255                QueryMode::Load(_) => None,
256            },
257            page: match self.mode {
258                QueryMode::Load(spec) => {
259                    if spec.limit.is_some() || spec.offset > 0 {
260                        Some(PageSpec {
261                            limit: spec.limit,
262                            offset: spec.offset,
263                        })
264                    } else {
265                        None
266                    }
267                }
268                QueryMode::Delete(_) => None,
269            },
270            consistency: self.consistency,
271        };
272        let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
273
274        validate_logical_plan_model(&schema_info, self.model, &plan)?;
275
276        Ok(plan)
277    }
278
279    // Validate pre-plan policy invariants and key-access rules before planning.
280    fn validate_intent(&self) -> Result<(), IntentError> {
281        if self.key_access_conflict {
282            return Err(IntentError::KeyAccessConflict);
283        }
284
285        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
286            .map_err(IntentError::from)?;
287
288        if let Some(state) = &self.key_access {
289            match state.kind {
290                KeyAccessKind::Many if self.predicate.is_some() => {
291                    return Err(IntentError::ByIdsWithPredicate);
292                }
293                KeyAccessKind::Only if self.predicate.is_some() => {
294                    return Err(IntentError::OnlyWithPredicate);
295                }
296                _ => {
297                    // NOTE: Single/Many without predicates impose no additional constraints.
298                }
299            }
300        }
301
302        Ok(())
303    }
304}
305
306///
307/// Query
308///
309/// Typed, declarative query intent for a specific entity type.
310///
311/// This intent is:
312/// - schema-agnostic at construction
313/// - normalized and validated only during planning
314/// - free of access-path decisions
315///
316
317///
318/// PlannedQuery
319///
320/// Neutral query-owned planned contract produced by query planning.
321/// Stores logical + access shape without executor compilation state.
322///
323#[derive(Debug)]
324pub struct PlannedQuery<E: EntityKind> {
325    plan: AccessPlannedQuery<E::Key>,
326    _marker: PhantomData<E>,
327}
328
329impl<E: EntityKind> PlannedQuery<E> {
330    #[must_use]
331    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
332        Self {
333            plan,
334            _marker: PhantomData,
335        }
336    }
337
338    #[must_use]
339    pub fn explain(&self) -> ExplainPlan {
340        self.plan.explain_with_model(E::MODEL)
341    }
342
343    #[must_use]
344    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
345        self.plan
346    }
347}
348
349impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
350    fn from(value: PlannedQuery<E>) -> Self {
351        Self::new(value.into_inner())
352    }
353}
354
355#[derive(Debug)]
356pub struct Query<E: EntityKind> {
357    intent: QueryModel<'static, E::Key>,
358    _marker: PhantomData<E>,
359}
360
361impl<E: EntityKind> Query<E> {
362    /// Create a new intent with an explicit missing-row policy.
363    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
364    /// Use Strict to surface missing rows during scan/delete execution.
365    #[must_use]
366    pub const fn new(consistency: ReadConsistency) -> Self {
367        Self {
368            intent: QueryModel::new(E::MODEL, consistency),
369            _marker: PhantomData,
370        }
371    }
372
373    /// Return the intent mode (load vs delete).
374    #[must_use]
375    pub const fn mode(&self) -> QueryMode {
376        self.intent.mode()
377    }
378
379    #[must_use]
380    pub(crate) fn has_explicit_order(&self) -> bool {
381        self.intent.has_explicit_order()
382    }
383
384    #[must_use]
385    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
386        self.intent.load_spec()
387    }
388
389    /// Add a predicate, implicitly AND-ing with any existing predicate.
390    #[must_use]
391    pub fn filter(mut self, predicate: Predicate) -> Self {
392        self.intent = self.intent.filter(predicate);
393        self
394    }
395
396    /// Apply a dynamic filter expression.
397    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
398        let Self { intent, _marker } = self;
399        let intent = intent.filter_expr(expr)?;
400
401        Ok(Self { intent, _marker })
402    }
403
404    /// Apply a dynamic sort expression.
405    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
406        let Self { intent, _marker } = self;
407        let intent = intent.sort_expr(expr)?;
408
409        Ok(Self { intent, _marker })
410    }
411
412    /// Append an ascending sort key.
413    #[must_use]
414    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
415        self.intent = self.intent.order_by(field);
416        self
417    }
418
419    /// Append a descending sort key.
420    #[must_use]
421    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
422        self.intent = self.intent.order_by_desc(field);
423        self
424    }
425
426    /// Enable DISTINCT semantics for this query.
427    #[must_use]
428    pub fn distinct(mut self) -> Self {
429        self.intent = self.intent.distinct();
430        self
431    }
432
433    /// Set the access path to a single primary key lookup.
434    pub(crate) fn by_id(self, id: E::Key) -> Self {
435        let Self { intent, _marker } = self;
436        Self {
437            intent: intent.by_id(id),
438            _marker,
439        }
440    }
441
442    /// Set the access path to a primary key batch lookup.
443    pub(crate) fn by_ids<I>(self, ids: I) -> Self
444    where
445        I: IntoIterator<Item = E::Key>,
446    {
447        let Self { intent, _marker } = self;
448        Self {
449            intent: intent.by_ids(ids),
450            _marker,
451        }
452    }
453
454    /// Mark this intent as a delete query.
455    #[must_use]
456    pub fn delete(mut self) -> Self {
457        self.intent = self.intent.delete();
458        self
459    }
460
461    /// Apply a limit to the current mode.
462    ///
463    /// Load limits bound result size; delete limits bound mutation size.
464    /// For load queries, any use of `limit` or `offset` requires an explicit
465    /// `order_by(...)` so pagination is deterministic.
466    #[must_use]
467    pub fn limit(mut self, limit: u32) -> Self {
468        self.intent = self.intent.limit(limit);
469        self
470    }
471
472    /// Apply an offset to a load intent.
473    ///
474    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
475    #[must_use]
476    pub fn offset(mut self, offset: u32) -> Self {
477        self.intent = self.intent.offset(offset);
478        self
479    }
480
481    /// Explain this intent without executing it.
482    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
483        let plan = self.planned()?;
484
485        Ok(plan.explain())
486    }
487
488    /// Plan this intent into a neutral planned query contract.
489    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
490        let plan = self.build_plan()?;
491
492        Ok(PlannedQuery::new(plan))
493    }
494
495    /// Compile this logical planned query into executor runtime state.
496    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
497        self.planned().map(ExecutablePlan::from)
498    }
499
500    // Build a logical plan for the current intent.
501    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
502        let plan_value = self.intent.build_plan_model()?;
503        let (logical, access) = plan_value.into_parts();
504        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
505        let plan = AccessPlannedQuery::from_parts(logical, access);
506
507        Ok(plan)
508    }
509}
510
511impl<E> Query<E>
512where
513    E: EntityKind + SingletonEntity,
514    E::Key: Default,
515{
516    /// Set the access path to the singleton primary key.
517    pub(crate) fn only(self) -> Self {
518        let Self { intent, _marker } = self;
519
520        Self {
521            intent: intent.only(E::Key::default()),
522            _marker,
523        }
524    }
525}
526
527///
528/// QueryError
529///
530
531#[derive(Debug, ThisError)]
532pub enum QueryError {
533    #[error("{0}")]
534    Validate(#[from] ValidateError),
535
536    #[error("{0}")]
537    Plan(Box<PlanError>),
538
539    #[error("{0}")]
540    Intent(#[from] IntentError),
541
542    #[error("{0}")]
543    Response(#[from] ResponseError),
544
545    #[error("{0}")]
546    Execute(#[from] InternalError),
547}
548
549impl From<PlannerError> for QueryError {
550    fn from(err: PlannerError) -> Self {
551        match err {
552            PlannerError::Plan(err) => Self::from(*err),
553            PlannerError::Internal(err) => Self::Execute(*err),
554        }
555    }
556}
557
558impl From<PlanError> for QueryError {
559    fn from(err: PlanError) -> Self {
560        Self::Plan(Box::new(err))
561    }
562}
563
564///
565/// IntentError
566///
567
568#[derive(Clone, Copy, Debug, ThisError)]
569pub enum IntentError {
570    #[error("{0}")]
571    PlanShape(#[from] policy::PlanPolicyError),
572
573    #[error("by_ids() cannot be combined with predicates")]
574    ByIdsWithPredicate,
575
576    #[error("only() cannot be combined with predicates")]
577    OnlyWithPredicate,
578
579    #[error("multiple key access methods were used on the same query")]
580    KeyAccessConflict,
581
582    #[error("cursor pagination requires an explicit ordering")]
583    CursorRequiresOrder,
584
585    #[error("cursor pagination requires an explicit limit")]
586    CursorRequiresLimit,
587
588    #[error("cursor tokens can only be used with .page().execute()")]
589    CursorRequiresPagedExecution,
590}
591
592impl From<policy::CursorPagingPolicyError> for IntentError {
593    fn from(err: policy::CursorPagingPolicyError) -> Self {
594        match err {
595            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
596            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
597        }
598    }
599}
600
601/// Helper to append an ordering field while preserving existing order spec.
602fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
603    match order {
604        Some(mut spec) => {
605            spec.fields.push((field.to_string(), direction));
606            spec
607        }
608        None => OrderSpec {
609            fields: vec![(field.to_string(), direction)],
610        },
611    }
612}
613
614// Normalize ORDER BY into a canonical, deterministic shape:
615// - preserve user field order
616// - remove explicit primary-key references from the user segment
617// - append exactly one primary-key field as the terminal tie-break
618fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
619    let mut order = order?;
620    if order.fields.is_empty() {
621        return Some(order);
622    }
623
624    let pk_field = model.primary_key.name;
625    let mut pk_direction = None;
626    order.fields.retain(|(field, direction)| {
627        if field == pk_field {
628            if pk_direction.is_none() {
629                pk_direction = Some(*direction);
630            }
631            false
632        } else {
633            true
634        }
635    });
636
637    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
638    order.fields.push((pk_field.to_string(), pk_direction));
639
640    Some(order)
641}