Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use key_access::*;
8pub type DeleteSpec = crate::db::access::DeleteSpec;
9pub type LoadSpec = crate::db::access::LoadSpec;
10pub type QueryMode = crate::db::access::QueryMode;
11
12use crate::{
13    db::{
14        contracts::ReadConsistency,
15        executor::ExecutablePlan,
16        policy,
17        query::{
18            explain::ExplainPlan,
19            expr::{FilterExpr, SortExpr, SortLowerError},
20            plan::{
21                AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
22                PageSpec, PlanError,
23                planner::{PlannerError, plan_access},
24                validate::validate_logical_plan_model,
25            },
26            predicate::{
27                Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
28                validate::reject_unsupported_query_features,
29            },
30        },
31        response::ResponseError,
32    },
33    error::InternalError,
34    model::entity::EntityModel,
35    traits::{EntityKind, FieldValue, SingletonEntity},
36    value::Value,
37};
38use std::marker::PhantomData;
39use thiserror::Error as ThisError;
40
41///
42/// QueryModel
43///
44/// Model-level query intent and planning context.
45/// Consumes an `EntityModel` derived from typed entity definitions.
46///
47
48#[derive(Debug)]
49pub(crate) struct QueryModel<'m, K> {
50    model: &'m EntityModel,
51    mode: QueryMode,
52    predicate: Option<Predicate>,
53    key_access: Option<KeyAccessState<K>>,
54    key_access_conflict: bool,
55    order: Option<OrderSpec>,
56    distinct: bool,
57    consistency: ReadConsistency,
58}
59
60impl<'m, K: FieldValue> QueryModel<'m, K> {
61    #[must_use]
62    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
63        Self {
64            model,
65            mode: QueryMode::Load(LoadSpec::new()),
66            predicate: None,
67            key_access: None,
68            key_access_conflict: false,
69            order: None,
70            distinct: false,
71            consistency,
72        }
73    }
74
75    /// Return the intent mode (load vs delete).
76    #[must_use]
77    pub(crate) const fn mode(&self) -> QueryMode {
78        self.mode
79    }
80
81    #[must_use]
82    fn has_explicit_order(&self) -> bool {
83        policy::has_explicit_order(self.order.as_ref())
84    }
85
86    #[must_use]
87    const fn load_spec(&self) -> Option<LoadSpec> {
88        match self.mode {
89            QueryMode::Load(spec) => Some(spec),
90            QueryMode::Delete(_) => None,
91        }
92    }
93
94    /// Add a predicate, implicitly AND-ing with any existing predicate.
95    #[must_use]
96    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
97        self.predicate = match self.predicate.take() {
98            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
99            None => Some(predicate),
100        };
101        self
102    }
103
104    /// Apply a dynamic filter expression using the model schema.
105    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
106        let schema = SchemaInfo::from_entity_model(self.model)?;
107        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
108
109        Ok(self.filter(predicate))
110    }
111
112    /// Apply a dynamic sort expression using the model schema.
113    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
114        let schema = SchemaInfo::from_entity_model(self.model)?;
115        let order = match expr.lower_with(&schema) {
116            Ok(order) => order,
117            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
118            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
119        };
120
121        policy::validate_order_shape(Some(&order))
122            .map_err(IntentError::from)
123            .map_err(QueryError::from)?;
124
125        Ok(self.order_spec(order))
126    }
127
128    /// Append an ascending sort key.
129    #[must_use]
130    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
131        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
132        self
133    }
134
135    /// Append a descending sort key.
136    #[must_use]
137    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
138        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
139        self
140    }
141
142    /// Set a fully-specified order spec (validated before reaching this boundary).
143    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
144        self.order = Some(order);
145        self
146    }
147
148    /// Enable DISTINCT semantics for this query intent.
149    #[must_use]
150    pub(crate) const fn distinct(mut self) -> Self {
151        self.distinct = true;
152        self
153    }
154
155    /// Track key-only access paths and detect conflicting key intents.
156    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
157        if let Some(existing) = &self.key_access
158            && existing.kind != kind
159        {
160            self.key_access_conflict = true;
161        }
162
163        self.key_access = Some(KeyAccessState { kind, access });
164
165        self
166    }
167
168    /// Set the access path to a single primary key lookup.
169    pub(crate) fn by_id(self, id: K) -> Self {
170        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
171    }
172
173    /// Set the access path to a primary key batch lookup.
174    pub(crate) fn by_ids<I>(self, ids: I) -> Self
175    where
176        I: IntoIterator<Item = K>,
177    {
178        self.set_key_access(
179            KeyAccessKind::Many,
180            KeyAccess::Many(ids.into_iter().collect()),
181        )
182    }
183
184    /// Set the access path to the singleton primary key.
185    pub(crate) fn only(self, id: K) -> Self {
186        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
187    }
188
189    /// Mark this intent as a delete query.
190    #[must_use]
191    pub(crate) const fn delete(mut self) -> Self {
192        if self.mode.is_load() {
193            self.mode = QueryMode::Delete(DeleteSpec::new());
194        }
195        self
196    }
197
198    /// Apply a limit to the current mode.
199    ///
200    /// Load limits bound result size; delete limits bound mutation size.
201    #[must_use]
202    pub(crate) const fn limit(mut self, limit: u32) -> Self {
203        match self.mode {
204            QueryMode::Load(mut spec) => {
205                spec.limit = Some(limit);
206                self.mode = QueryMode::Load(spec);
207            }
208            QueryMode::Delete(mut spec) => {
209                spec.limit = Some(limit);
210                self.mode = QueryMode::Delete(spec);
211            }
212        }
213        self
214    }
215
216    /// Apply an offset to a load intent.
217    #[must_use]
218    pub(crate) const fn offset(mut self, offset: u32) -> Self {
219        if let QueryMode::Load(mut spec) = self.mode {
220            spec.offset = offset;
221            self.mode = QueryMode::Load(spec);
222        }
223        self
224    }
225
226    /// Build a model-level logical plan using Value-based access keys.
227    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
228        // Phase 1: schema surface and intent validation.
229        let schema_info = SchemaInfo::from_entity_model(self.model)?;
230        self.validate_intent()?;
231
232        // Phase 2: predicate normalization and access planning.
233        let normalized_predicate = self
234            .predicate
235            .as_ref()
236            .map(|predicate| {
237                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
238                let predicate = normalize_enum_literals(&schema_info, predicate)?;
239                Ok::<Predicate, ValidateError>(normalize(&predicate))
240            })
241            .transpose()?;
242        let access_plan_value = match &self.key_access {
243            Some(state) => access_plan_from_keys_value(&state.access),
244            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
245        };
246
247        // Phase 3: assemble the executor-ready plan.
248        let logical = LogicalPlan {
249            mode: self.mode,
250            predicate: normalized_predicate,
251            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
252            // This ensures explain/fingerprint/execution share one deterministic order shape.
253            order: canonicalize_order_spec(self.model, self.order.clone()),
254            distinct: self.distinct,
255            delete_limit: match self.mode {
256                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
257                QueryMode::Load(_) => None,
258            },
259            page: match self.mode {
260                QueryMode::Load(spec) => {
261                    if spec.limit.is_some() || spec.offset > 0 {
262                        Some(PageSpec {
263                            limit: spec.limit,
264                            offset: spec.offset,
265                        })
266                    } else {
267                        None
268                    }
269                }
270                QueryMode::Delete(_) => None,
271            },
272            consistency: self.consistency,
273        };
274        let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
275
276        validate_logical_plan_model(&schema_info, self.model, &plan)?;
277
278        Ok(plan)
279    }
280
281    // Validate pre-plan policy invariants and key-access rules before planning.
282    fn validate_intent(&self) -> Result<(), IntentError> {
283        if self.key_access_conflict {
284            return Err(IntentError::KeyAccessConflict);
285        }
286
287        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
288            .map_err(IntentError::from)?;
289
290        if let Some(state) = &self.key_access {
291            match state.kind {
292                KeyAccessKind::Many if self.predicate.is_some() => {
293                    return Err(IntentError::ByIdsWithPredicate);
294                }
295                KeyAccessKind::Only if self.predicate.is_some() => {
296                    return Err(IntentError::OnlyWithPredicate);
297                }
298                _ => {
299                    // NOTE: Single/Many without predicates impose no additional constraints.
300                }
301            }
302        }
303
304        Ok(())
305    }
306}
307
308///
309/// Query
310///
311/// Typed, declarative query intent for a specific entity type.
312///
313/// This intent is:
314/// - schema-agnostic at construction
315/// - normalized and validated only during planning
316/// - free of access-path decisions
317///
318
319///
320/// PlannedQuery
321///
322/// Neutral query-owned planned contract produced by query planning.
323/// Stores logical + access shape without executor compilation state.
324///
325#[derive(Debug)]
326pub struct PlannedQuery<E: EntityKind> {
327    plan: AccessPlannedQuery<E::Key>,
328    _marker: PhantomData<E>,
329}
330
331impl<E: EntityKind> PlannedQuery<E> {
332    #[must_use]
333    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
334        Self {
335            plan,
336            _marker: PhantomData,
337        }
338    }
339
340    #[must_use]
341    pub fn explain(&self) -> ExplainPlan {
342        self.plan.explain_with_model(E::MODEL)
343    }
344
345    #[must_use]
346    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
347        self.plan
348    }
349}
350
351impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
352    fn from(value: PlannedQuery<E>) -> Self {
353        Self::new(value.into_inner())
354    }
355}
356
357#[derive(Debug)]
358pub struct Query<E: EntityKind> {
359    intent: QueryModel<'static, E::Key>,
360    _marker: PhantomData<E>,
361}
362
363impl<E: EntityKind> Query<E> {
364    /// Create a new intent with an explicit missing-row policy.
365    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
366    /// Use Strict to surface missing rows during scan/delete execution.
367    #[must_use]
368    pub const fn new(consistency: ReadConsistency) -> Self {
369        Self {
370            intent: QueryModel::new(E::MODEL, consistency),
371            _marker: PhantomData,
372        }
373    }
374
375    /// Return the intent mode (load vs delete).
376    #[must_use]
377    pub const fn mode(&self) -> QueryMode {
378        self.intent.mode()
379    }
380
381    #[must_use]
382    pub(crate) fn has_explicit_order(&self) -> bool {
383        self.intent.has_explicit_order()
384    }
385
386    #[must_use]
387    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
388        self.intent.load_spec()
389    }
390
391    /// Add a predicate, implicitly AND-ing with any existing predicate.
392    #[must_use]
393    pub fn filter(mut self, predicate: Predicate) -> Self {
394        self.intent = self.intent.filter(predicate);
395        self
396    }
397
398    /// Apply a dynamic filter expression.
399    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
400        let Self { intent, _marker } = self;
401        let intent = intent.filter_expr(expr)?;
402
403        Ok(Self { intent, _marker })
404    }
405
406    /// Apply a dynamic sort expression.
407    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
408        let Self { intent, _marker } = self;
409        let intent = intent.sort_expr(expr)?;
410
411        Ok(Self { intent, _marker })
412    }
413
414    /// Append an ascending sort key.
415    #[must_use]
416    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
417        self.intent = self.intent.order_by(field);
418        self
419    }
420
421    /// Append a descending sort key.
422    #[must_use]
423    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
424        self.intent = self.intent.order_by_desc(field);
425        self
426    }
427
428    /// Enable DISTINCT semantics for this query.
429    #[must_use]
430    pub fn distinct(mut self) -> Self {
431        self.intent = self.intent.distinct();
432        self
433    }
434
435    /// Set the access path to a single primary key lookup.
436    pub(crate) fn by_id(self, id: E::Key) -> Self {
437        let Self { intent, _marker } = self;
438        Self {
439            intent: intent.by_id(id),
440            _marker,
441        }
442    }
443
444    /// Set the access path to a primary key batch lookup.
445    pub(crate) fn by_ids<I>(self, ids: I) -> Self
446    where
447        I: IntoIterator<Item = E::Key>,
448    {
449        let Self { intent, _marker } = self;
450        Self {
451            intent: intent.by_ids(ids),
452            _marker,
453        }
454    }
455
456    /// Mark this intent as a delete query.
457    #[must_use]
458    pub fn delete(mut self) -> Self {
459        self.intent = self.intent.delete();
460        self
461    }
462
463    /// Apply a limit to the current mode.
464    ///
465    /// Load limits bound result size; delete limits bound mutation size.
466    /// For load queries, any use of `limit` or `offset` requires an explicit
467    /// `order_by(...)` so pagination is deterministic.
468    #[must_use]
469    pub fn limit(mut self, limit: u32) -> Self {
470        self.intent = self.intent.limit(limit);
471        self
472    }
473
474    /// Apply an offset to a load intent.
475    ///
476    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
477    #[must_use]
478    pub fn offset(mut self, offset: u32) -> Self {
479        self.intent = self.intent.offset(offset);
480        self
481    }
482
483    /// Explain this intent without executing it.
484    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
485        let plan = self.planned()?;
486
487        Ok(plan.explain())
488    }
489
490    /// Plan this intent into a neutral planned query contract.
491    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
492        let plan = self.build_plan()?;
493
494        Ok(PlannedQuery::new(plan))
495    }
496
497    /// Compile this logical planned query into executor runtime state.
498    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
499        self.planned().map(ExecutablePlan::from)
500    }
501
502    // Build a logical plan for the current intent.
503    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
504        let plan_value = self.intent.build_plan_model()?;
505        let (logical, access) = plan_value.into_parts();
506        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
507        let plan = AccessPlannedQuery::from_parts(logical, access);
508
509        Ok(plan)
510    }
511}
512
513impl<E> Query<E>
514where
515    E: EntityKind + SingletonEntity,
516    E::Key: Default,
517{
518    /// Set the access path to the singleton primary key.
519    pub(crate) fn only(self) -> Self {
520        let Self { intent, _marker } = self;
521
522        Self {
523            intent: intent.only(E::Key::default()),
524            _marker,
525        }
526    }
527}
528
529///
530/// QueryError
531///
532
533#[derive(Debug, ThisError)]
534pub enum QueryError {
535    #[error("{0}")]
536    Validate(#[from] ValidateError),
537
538    #[error("{0}")]
539    Plan(Box<PlanError>),
540
541    #[error("{0}")]
542    Intent(#[from] IntentError),
543
544    #[error("{0}")]
545    Response(#[from] ResponseError),
546
547    #[error("{0}")]
548    Execute(#[from] InternalError),
549}
550
551impl From<PlannerError> for QueryError {
552    fn from(err: PlannerError) -> Self {
553        match err {
554            PlannerError::Plan(err) => Self::from(*err),
555            PlannerError::Internal(err) => Self::Execute(*err),
556        }
557    }
558}
559
560impl From<PlanError> for QueryError {
561    fn from(err: PlanError) -> Self {
562        Self::Plan(Box::new(err))
563    }
564}
565
566///
567/// IntentError
568///
569
570#[derive(Clone, Copy, Debug, ThisError)]
571pub enum IntentError {
572    #[error("{0}")]
573    PlanShape(#[from] policy::PlanPolicyError),
574
575    #[error("by_ids() cannot be combined with predicates")]
576    ByIdsWithPredicate,
577
578    #[error("only() cannot be combined with predicates")]
579    OnlyWithPredicate,
580
581    #[error("multiple key access methods were used on the same query")]
582    KeyAccessConflict,
583
584    #[error("cursor pagination requires an explicit ordering")]
585    CursorRequiresOrder,
586
587    #[error("cursor pagination requires an explicit limit")]
588    CursorRequiresLimit,
589
590    #[error("cursor tokens can only be used with .page().execute()")]
591    CursorRequiresPagedExecution,
592}
593
594impl From<policy::CursorPagingPolicyError> for IntentError {
595    fn from(err: policy::CursorPagingPolicyError) -> Self {
596        match err {
597            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
598            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
599        }
600    }
601}
602
603/// Helper to append an ordering field while preserving existing order spec.
604fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
605    match order {
606        Some(mut spec) => {
607            spec.fields.push((field.to_string(), direction));
608            spec
609        }
610        None => OrderSpec {
611            fields: vec![(field.to_string(), direction)],
612        },
613    }
614}
615
616// Normalize ORDER BY into a canonical, deterministic shape:
617// - preserve user field order
618// - remove explicit primary-key references from the user segment
619// - append exactly one primary-key field as the terminal tie-break
620fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
621    let mut order = order?;
622    if order.fields.is_empty() {
623        return Some(order);
624    }
625
626    let pk_field = model.primary_key.name;
627    let mut pk_direction = None;
628    order.fields.retain(|(field, direction)| {
629        if field == pk_field {
630            if pk_direction.is_none() {
631                pk_direction = Some(*direction);
632            }
633            false
634        } else {
635            true
636        }
637    });
638
639    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
640    order.fields.push((pk_field.to_string(), pk_direction));
641
642    Some(order)
643}