Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use crate::db::intent::{DeleteSpec, LoadSpec, QueryMode};
8pub(crate) use key_access::*;
9
10use crate::{
11    db::{
12        consistency::ReadConsistency,
13        query::{
14            explain::ExplainPlan,
15            expr::{FilterExpr, SortExpr, SortLowerError},
16            plan::{
17                AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
18                PageSpec, PlanError,
19                planner::{PlannerError, plan_access},
20                validate::validate_logical_plan_model,
21            },
22            policy,
23            predicate::{
24                Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
25                validate::reject_unsupported_query_features,
26            },
27        },
28        response::ResponseError,
29    },
30    error::InternalError,
31    model::entity::EntityModel,
32    traits::{EntityKind, FieldValue, SingletonEntity},
33    value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38///
39/// QueryModel
40///
41/// Model-level query intent and planning context.
42/// Consumes an `EntityModel` derived from typed entity definitions.
43///
44
45#[derive(Debug)]
46pub(crate) struct QueryModel<'m, K> {
47    model: &'m EntityModel,
48    mode: QueryMode,
49    predicate: Option<Predicate>,
50    key_access: Option<KeyAccessState<K>>,
51    key_access_conflict: bool,
52    order: Option<OrderSpec>,
53    distinct: bool,
54    consistency: ReadConsistency,
55}
56
57impl<'m, K: FieldValue> QueryModel<'m, K> {
58    #[must_use]
59    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
60        Self {
61            model,
62            mode: QueryMode::Load(LoadSpec::new()),
63            predicate: None,
64            key_access: None,
65            key_access_conflict: false,
66            order: None,
67            distinct: false,
68            consistency,
69        }
70    }
71
72    /// Return the intent mode (load vs delete).
73    #[must_use]
74    pub(crate) const fn mode(&self) -> QueryMode {
75        self.mode
76    }
77
78    #[must_use]
79    fn has_explicit_order(&self) -> bool {
80        policy::has_explicit_order(self.order.as_ref())
81    }
82
83    #[must_use]
84    const fn load_spec(&self) -> Option<LoadSpec> {
85        match self.mode {
86            QueryMode::Load(spec) => Some(spec),
87            QueryMode::Delete(_) => None,
88        }
89    }
90
91    /// Add a predicate, implicitly AND-ing with any existing predicate.
92    #[must_use]
93    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
94        self.predicate = match self.predicate.take() {
95            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
96            None => Some(predicate),
97        };
98        self
99    }
100
101    /// Apply a dynamic filter expression using the model schema.
102    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
103        let schema = SchemaInfo::from_entity_model(self.model)?;
104        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
105
106        Ok(self.filter(predicate))
107    }
108
109    /// Apply a dynamic sort expression using the model schema.
110    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
111        let schema = SchemaInfo::from_entity_model(self.model)?;
112        let order = match expr.lower_with(&schema) {
113            Ok(order) => order,
114            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
115            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
116        };
117
118        policy::validate_order_shape(Some(&order))
119            .map_err(IntentError::from)
120            .map_err(QueryError::from)?;
121
122        Ok(self.order_spec(order))
123    }
124
125    /// Append an ascending sort key.
126    #[must_use]
127    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
128        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
129        self
130    }
131
132    /// Append a descending sort key.
133    #[must_use]
134    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
135        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
136        self
137    }
138
139    /// Set a fully-specified order spec (validated before reaching this boundary).
140    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
141        self.order = Some(order);
142        self
143    }
144
145    /// Enable DISTINCT semantics for this query intent.
146    #[must_use]
147    pub(crate) const fn distinct(mut self) -> Self {
148        self.distinct = true;
149        self
150    }
151
152    /// Track key-only access paths and detect conflicting key intents.
153    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
154        if let Some(existing) = &self.key_access
155            && existing.kind != kind
156        {
157            self.key_access_conflict = true;
158        }
159
160        self.key_access = Some(KeyAccessState { kind, access });
161
162        self
163    }
164
165    /// Set the access path to a single primary key lookup.
166    pub(crate) fn by_id(self, id: K) -> Self {
167        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
168    }
169
170    /// Set the access path to a primary key batch lookup.
171    pub(crate) fn by_ids<I>(self, ids: I) -> Self
172    where
173        I: IntoIterator<Item = K>,
174    {
175        self.set_key_access(
176            KeyAccessKind::Many,
177            KeyAccess::Many(ids.into_iter().collect()),
178        )
179    }
180
181    /// Set the access path to the singleton primary key.
182    pub(crate) fn only(self, id: K) -> Self {
183        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
184    }
185
186    /// Mark this intent as a delete query.
187    #[must_use]
188    pub(crate) const fn delete(mut self) -> Self {
189        if self.mode.is_load() {
190            self.mode = QueryMode::Delete(DeleteSpec::new());
191        }
192        self
193    }
194
195    /// Apply a limit to the current mode.
196    ///
197    /// Load limits bound result size; delete limits bound mutation size.
198    #[must_use]
199    pub(crate) const fn limit(mut self, limit: u32) -> Self {
200        match self.mode {
201            QueryMode::Load(mut spec) => {
202                spec.limit = Some(limit);
203                self.mode = QueryMode::Load(spec);
204            }
205            QueryMode::Delete(mut spec) => {
206                spec.limit = Some(limit);
207                self.mode = QueryMode::Delete(spec);
208            }
209        }
210        self
211    }
212
213    /// Apply an offset to a load intent.
214    #[must_use]
215    pub(crate) const fn offset(mut self, offset: u32) -> Self {
216        if let QueryMode::Load(mut spec) = self.mode {
217            spec.offset = offset;
218            self.mode = QueryMode::Load(spec);
219        }
220        self
221    }
222
223    /// Build a model-level logical plan using Value-based access keys.
224    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
225        // Phase 1: schema surface and intent validation.
226        let schema_info = SchemaInfo::from_entity_model(self.model)?;
227        self.validate_intent()?;
228
229        // Phase 2: predicate normalization and access planning.
230        let normalized_predicate = self
231            .predicate
232            .as_ref()
233            .map(|predicate| {
234                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
235                let predicate = normalize_enum_literals(&schema_info, predicate)?;
236                Ok::<Predicate, ValidateError>(normalize(&predicate))
237            })
238            .transpose()?;
239        let access_plan_value = match &self.key_access {
240            Some(state) => access_plan_from_keys_value(&state.access),
241            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
242        };
243
244        // Phase 3: assemble the executor-ready plan.
245        let logical = LogicalPlan {
246            mode: self.mode,
247            predicate: normalized_predicate,
248            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
249            // This ensures explain/fingerprint/execution share one deterministic order shape.
250            order: canonicalize_order_spec(self.model, self.order.clone()),
251            distinct: self.distinct,
252            delete_limit: match self.mode {
253                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
254                QueryMode::Load(_) => None,
255            },
256            page: match self.mode {
257                QueryMode::Load(spec) => {
258                    if spec.limit.is_some() || spec.offset > 0 {
259                        Some(PageSpec {
260                            limit: spec.limit,
261                            offset: spec.offset,
262                        })
263                    } else {
264                        None
265                    }
266                }
267                QueryMode::Delete(_) => None,
268            },
269            consistency: self.consistency,
270        };
271        let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
272
273        validate_logical_plan_model(&schema_info, self.model, &plan)?;
274
275        Ok(plan)
276    }
277
278    // Validate pre-plan policy invariants and key-access rules before planning.
279    fn validate_intent(&self) -> Result<(), IntentError> {
280        if self.key_access_conflict {
281            return Err(IntentError::KeyAccessConflict);
282        }
283
284        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
285            .map_err(IntentError::from)?;
286
287        if let Some(state) = &self.key_access {
288            match state.kind {
289                KeyAccessKind::Many if self.predicate.is_some() => {
290                    return Err(IntentError::ByIdsWithPredicate);
291                }
292                KeyAccessKind::Only if self.predicate.is_some() => {
293                    return Err(IntentError::OnlyWithPredicate);
294                }
295                _ => {
296                    // NOTE: Single/Many without predicates impose no additional constraints.
297                }
298            }
299        }
300
301        Ok(())
302    }
303}
304
305///
306/// Query
307///
308/// Typed, declarative query intent for a specific entity type.
309///
310/// This intent is:
311/// - schema-agnostic at construction
312/// - normalized and validated only during planning
313/// - free of access-path decisions
314///
315
316///
317/// PlannedQuery
318///
319/// Neutral query-owned planned contract produced by query planning.
320/// Stores logical + access shape without executor compilation state.
321///
322#[derive(Debug)]
323pub struct PlannedQuery<E: EntityKind> {
324    plan: AccessPlannedQuery<E::Key>,
325    _marker: PhantomData<E>,
326}
327
328impl<E: EntityKind> PlannedQuery<E> {
329    #[must_use]
330    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
331        Self {
332            plan,
333            _marker: PhantomData,
334        }
335    }
336
337    #[must_use]
338    pub fn explain(&self) -> ExplainPlan {
339        self.plan.explain_with_model(E::MODEL)
340    }
341
342    #[must_use]
343    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
344        self.plan
345    }
346}
347
348#[derive(Debug)]
349pub struct Query<E: EntityKind> {
350    intent: QueryModel<'static, E::Key>,
351    _marker: PhantomData<E>,
352}
353
354impl<E: EntityKind> Query<E> {
355    /// Create a new intent with an explicit missing-row policy.
356    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
357    /// Use Strict to surface missing rows during scan/delete execution.
358    #[must_use]
359    pub const fn new(consistency: ReadConsistency) -> Self {
360        Self {
361            intent: QueryModel::new(E::MODEL, consistency),
362            _marker: PhantomData,
363        }
364    }
365
366    /// Return the intent mode (load vs delete).
367    #[must_use]
368    pub const fn mode(&self) -> QueryMode {
369        self.intent.mode()
370    }
371
372    #[must_use]
373    pub(crate) fn has_explicit_order(&self) -> bool {
374        self.intent.has_explicit_order()
375    }
376
377    #[must_use]
378    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
379        self.intent.load_spec()
380    }
381
382    /// Add a predicate, implicitly AND-ing with any existing predicate.
383    #[must_use]
384    pub fn filter(mut self, predicate: Predicate) -> Self {
385        self.intent = self.intent.filter(predicate);
386        self
387    }
388
389    /// Apply a dynamic filter expression.
390    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
391        let Self { intent, _marker } = self;
392        let intent = intent.filter_expr(expr)?;
393
394        Ok(Self { intent, _marker })
395    }
396
397    /// Apply a dynamic sort expression.
398    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
399        let Self { intent, _marker } = self;
400        let intent = intent.sort_expr(expr)?;
401
402        Ok(Self { intent, _marker })
403    }
404
405    /// Append an ascending sort key.
406    #[must_use]
407    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
408        self.intent = self.intent.order_by(field);
409        self
410    }
411
412    /// Append a descending sort key.
413    #[must_use]
414    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
415        self.intent = self.intent.order_by_desc(field);
416        self
417    }
418
419    /// Enable DISTINCT semantics for this query.
420    #[must_use]
421    pub fn distinct(mut self) -> Self {
422        self.intent = self.intent.distinct();
423        self
424    }
425
426    /// Set the access path to a single primary key lookup.
427    pub(crate) fn by_id(self, id: E::Key) -> Self {
428        let Self { intent, _marker } = self;
429        Self {
430            intent: intent.by_id(id),
431            _marker,
432        }
433    }
434
435    /// Set the access path to a primary key batch lookup.
436    pub(crate) fn by_ids<I>(self, ids: I) -> Self
437    where
438        I: IntoIterator<Item = E::Key>,
439    {
440        let Self { intent, _marker } = self;
441        Self {
442            intent: intent.by_ids(ids),
443            _marker,
444        }
445    }
446
447    /// Mark this intent as a delete query.
448    #[must_use]
449    pub fn delete(mut self) -> Self {
450        self.intent = self.intent.delete();
451        self
452    }
453
454    /// Apply a limit to the current mode.
455    ///
456    /// Load limits bound result size; delete limits bound mutation size.
457    /// For load queries, any use of `limit` or `offset` requires an explicit
458    /// `order_by(...)` so pagination is deterministic.
459    #[must_use]
460    pub fn limit(mut self, limit: u32) -> Self {
461        self.intent = self.intent.limit(limit);
462        self
463    }
464
465    /// Apply an offset to a load intent.
466    ///
467    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
468    #[must_use]
469    pub fn offset(mut self, offset: u32) -> Self {
470        self.intent = self.intent.offset(offset);
471        self
472    }
473
474    /// Explain this intent without executing it.
475    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
476        let plan = self.planned()?;
477
478        Ok(plan.explain())
479    }
480
481    /// Plan this intent into a neutral planned query contract.
482    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
483        let plan = self.build_plan()?;
484
485        Ok(PlannedQuery::new(plan))
486    }
487
488    // Build a logical plan for the current intent.
489    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
490        let plan_value = self.intent.build_plan_model()?;
491        let (logical, access) = plan_value.into_parts();
492        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
493        let plan = AccessPlannedQuery::from_parts(logical, access);
494
495        Ok(plan)
496    }
497}
498
499impl<E> Query<E>
500where
501    E: EntityKind + SingletonEntity,
502    E::Key: Default,
503{
504    /// Set the access path to the singleton primary key.
505    pub(crate) fn only(self) -> Self {
506        let Self { intent, _marker } = self;
507
508        Self {
509            intent: intent.only(E::Key::default()),
510            _marker,
511        }
512    }
513}
514
515///
516/// QueryError
517///
518
519#[derive(Debug, ThisError)]
520pub enum QueryError {
521    #[error("{0}")]
522    Validate(#[from] ValidateError),
523
524    #[error("{0}")]
525    Plan(Box<PlanError>),
526
527    #[error("{0}")]
528    Intent(#[from] IntentError),
529
530    #[error("{0}")]
531    Response(#[from] ResponseError),
532
533    #[error("{0}")]
534    Execute(#[from] InternalError),
535}
536
537impl From<PlannerError> for QueryError {
538    fn from(err: PlannerError) -> Self {
539        match err {
540            PlannerError::Plan(err) => Self::from(*err),
541            PlannerError::Internal(err) => Self::Execute(*err),
542        }
543    }
544}
545
546impl From<PlanError> for QueryError {
547    fn from(err: PlanError) -> Self {
548        Self::Plan(Box::new(err))
549    }
550}
551
552///
553/// IntentError
554///
555
556#[derive(Clone, Copy, Debug, ThisError)]
557pub enum IntentError {
558    #[error("{0}")]
559    PlanShape(#[from] policy::PlanPolicyError),
560
561    #[error("by_ids() cannot be combined with predicates")]
562    ByIdsWithPredicate,
563
564    #[error("only() cannot be combined with predicates")]
565    OnlyWithPredicate,
566
567    #[error("multiple key access methods were used on the same query")]
568    KeyAccessConflict,
569
570    #[error("cursor pagination requires an explicit ordering")]
571    CursorRequiresOrder,
572
573    #[error("cursor pagination requires an explicit limit")]
574    CursorRequiresLimit,
575
576    #[error("cursor tokens can only be used with .page().execute()")]
577    CursorRequiresPagedExecution,
578}
579
580impl From<policy::CursorPagingPolicyError> for IntentError {
581    fn from(err: policy::CursorPagingPolicyError) -> Self {
582        match err {
583            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
584            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
585        }
586    }
587}
588
589/// Helper to append an ordering field while preserving existing order spec.
590fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
591    match order {
592        Some(mut spec) => {
593            spec.fields.push((field.to_string(), direction));
594            spec
595        }
596        None => OrderSpec {
597            fields: vec![(field.to_string(), direction)],
598        },
599    }
600}
601
602// Normalize ORDER BY into a canonical, deterministic shape:
603// - preserve user field order
604// - remove explicit primary-key references from the user segment
605// - append exactly one primary-key field as the terminal tie-break
606fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
607    let mut order = order?;
608    if order.fields.is_empty() {
609        return Some(order);
610    }
611
612    let pk_field = model.primary_key.name;
613    let mut pk_direction = None;
614    order.fields.retain(|(field, direction)| {
615        if field == pk_field {
616            if pk_direction.is_none() {
617                pk_direction = Some(*direction);
618            }
619            false
620        } else {
621            true
622        }
623    });
624
625    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
626    order.fields.push((pk_field.to_string(), pk_direction));
627
628    Some(order)
629}