Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10    db::{
11        executor::ExecutablePlan,
12        query::{
13            ReadConsistency,
14            explain::ExplainPlan,
15            expr::{FilterExpr, SortExpr, SortLowerError},
16            plan::{
17                AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
18                PageSpec, PlanError,
19                planner::{PlannerError, plan_access},
20                validate::validate_logical_plan_model,
21            },
22            policy,
23            predicate::{
24                Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
25                validate::reject_unsupported_query_features,
26            },
27        },
28        response::ResponseError,
29    },
30    error::InternalError,
31    model::entity::EntityModel,
32    traits::{EntityKind, FieldValue, SingletonEntity},
33    value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38///
39/// QueryMode
40/// Discriminates load vs delete intent at planning time.
41/// Encodes mode-specific fields so invalid states are unrepresentable.
42/// Mode checks are explicit and stable at execution time.
43///
44
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46pub enum QueryMode {
47    Load(LoadSpec),
48    Delete(DeleteSpec),
49}
50
51impl QueryMode {
52    /// True if this mode represents a load intent.
53    #[must_use]
54    pub const fn is_load(&self) -> bool {
55        match self {
56            Self::Load(_) => true,
57            Self::Delete(_) => false,
58        }
59    }
60
61    /// True if this mode represents a delete intent.
62    #[must_use]
63    pub const fn is_delete(&self) -> bool {
64        match self {
65            Self::Delete(_) => true,
66            Self::Load(_) => false,
67        }
68    }
69}
70
71///
72/// LoadSpec
73/// Mode-specific fields for load intents.
74/// Encodes pagination without leaking into delete intents.
75///
76
77#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
78pub struct LoadSpec {
79    pub limit: Option<u32>,
80    pub offset: u32,
81}
82
83impl LoadSpec {
84    /// Create an empty load spec.
85    #[must_use]
86    pub const fn new() -> Self {
87        Self {
88            limit: None,
89            offset: 0,
90        }
91    }
92}
93
94///
95/// DeleteSpec
96/// Mode-specific fields for delete intents.
97/// Encodes delete limits without leaking into load intents.
98///
99
100#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
101pub struct DeleteSpec {
102    pub limit: Option<u32>,
103}
104
105impl DeleteSpec {
106    /// Create an empty delete spec.
107    #[must_use]
108    pub const fn new() -> Self {
109        Self { limit: None }
110    }
111}
112
113///
114/// QueryModel
115///
116/// Model-level query intent and planning context.
117/// Consumes an `EntityModel` derived from typed entity definitions.
118///
119
120#[derive(Debug)]
121pub(crate) struct QueryModel<'m, K> {
122    model: &'m EntityModel,
123    mode: QueryMode,
124    predicate: Option<Predicate>,
125    key_access: Option<KeyAccessState<K>>,
126    key_access_conflict: bool,
127    order: Option<OrderSpec>,
128    distinct: bool,
129    consistency: ReadConsistency,
130}
131
132impl<'m, K: FieldValue> QueryModel<'m, K> {
133    #[must_use]
134    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
135        Self {
136            model,
137            mode: QueryMode::Load(LoadSpec::new()),
138            predicate: None,
139            key_access: None,
140            key_access_conflict: false,
141            order: None,
142            distinct: false,
143            consistency,
144        }
145    }
146
147    /// Return the intent mode (load vs delete).
148    #[must_use]
149    pub(crate) const fn mode(&self) -> QueryMode {
150        self.mode
151    }
152
153    #[must_use]
154    fn has_explicit_order(&self) -> bool {
155        policy::has_explicit_order(self.order.as_ref())
156    }
157
158    #[must_use]
159    const fn load_spec(&self) -> Option<LoadSpec> {
160        match self.mode {
161            QueryMode::Load(spec) => Some(spec),
162            QueryMode::Delete(_) => None,
163        }
164    }
165
166    /// Add a predicate, implicitly AND-ing with any existing predicate.
167    #[must_use]
168    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
169        self.predicate = match self.predicate.take() {
170            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
171            None => Some(predicate),
172        };
173        self
174    }
175
176    /// Apply a dynamic filter expression using the model schema.
177    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
178        let schema = SchemaInfo::from_entity_model(self.model)?;
179        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
180
181        Ok(self.filter(predicate))
182    }
183
184    /// Apply a dynamic sort expression using the model schema.
185    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
186        let schema = SchemaInfo::from_entity_model(self.model)?;
187        let order = match expr.lower_with(&schema) {
188            Ok(order) => order,
189            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
190            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
191        };
192
193        policy::validate_order_shape(Some(&order))
194            .map_err(IntentError::from)
195            .map_err(QueryError::from)?;
196
197        Ok(self.order_spec(order))
198    }
199
200    /// Append an ascending sort key.
201    #[must_use]
202    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
203        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
204        self
205    }
206
207    /// Append a descending sort key.
208    #[must_use]
209    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
210        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
211        self
212    }
213
214    /// Set a fully-specified order spec (validated before reaching this boundary).
215    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
216        self.order = Some(order);
217        self
218    }
219
220    /// Enable DISTINCT semantics for this query intent.
221    #[must_use]
222    pub(crate) const fn distinct(mut self) -> Self {
223        self.distinct = true;
224        self
225    }
226
227    /// Track key-only access paths and detect conflicting key intents.
228    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
229        if let Some(existing) = &self.key_access
230            && existing.kind != kind
231        {
232            self.key_access_conflict = true;
233        }
234
235        self.key_access = Some(KeyAccessState { kind, access });
236
237        self
238    }
239
240    /// Set the access path to a single primary key lookup.
241    pub(crate) fn by_id(self, id: K) -> Self {
242        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
243    }
244
245    /// Set the access path to a primary key batch lookup.
246    pub(crate) fn by_ids<I>(self, ids: I) -> Self
247    where
248        I: IntoIterator<Item = K>,
249    {
250        self.set_key_access(
251            KeyAccessKind::Many,
252            KeyAccess::Many(ids.into_iter().collect()),
253        )
254    }
255
256    /// Set the access path to the singleton primary key.
257    pub(crate) fn only(self, id: K) -> Self {
258        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
259    }
260
261    /// Mark this intent as a delete query.
262    #[must_use]
263    pub(crate) const fn delete(mut self) -> Self {
264        if self.mode.is_load() {
265            self.mode = QueryMode::Delete(DeleteSpec::new());
266        }
267        self
268    }
269
270    /// Apply a limit to the current mode.
271    ///
272    /// Load limits bound result size; delete limits bound mutation size.
273    #[must_use]
274    pub(crate) const fn limit(mut self, limit: u32) -> Self {
275        match self.mode {
276            QueryMode::Load(mut spec) => {
277                spec.limit = Some(limit);
278                self.mode = QueryMode::Load(spec);
279            }
280            QueryMode::Delete(mut spec) => {
281                spec.limit = Some(limit);
282                self.mode = QueryMode::Delete(spec);
283            }
284        }
285        self
286    }
287
288    /// Apply an offset to a load intent.
289    #[must_use]
290    pub(crate) const fn offset(mut self, offset: u32) -> Self {
291        if let QueryMode::Load(mut spec) = self.mode {
292            spec.offset = offset;
293            self.mode = QueryMode::Load(spec);
294        }
295        self
296    }
297
298    /// Build a model-level logical plan using Value-based access keys.
299    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
300        // Phase 1: schema surface and intent validation.
301        let schema_info = SchemaInfo::from_entity_model(self.model)?;
302        self.validate_intent()?;
303
304        // Phase 2: predicate normalization and access planning.
305        let normalized_predicate = self
306            .predicate
307            .as_ref()
308            .map(|predicate| {
309                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
310                let predicate = normalize_enum_literals(&schema_info, predicate)?;
311                Ok::<Predicate, ValidateError>(normalize(&predicate))
312            })
313            .transpose()?;
314        let access_plan_value = match &self.key_access {
315            Some(state) => access_plan_from_keys_value(&state.access),
316            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
317        };
318
319        // Phase 3: assemble the executor-ready plan.
320        let logical = LogicalPlan {
321            mode: self.mode,
322            predicate: normalized_predicate,
323            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
324            // This ensures explain/fingerprint/execution share one deterministic order shape.
325            order: canonicalize_order_spec(self.model, self.order.clone()),
326            distinct: self.distinct,
327            delete_limit: match self.mode {
328                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
329                QueryMode::Load(_) => None,
330            },
331            page: match self.mode {
332                QueryMode::Load(spec) => {
333                    if spec.limit.is_some() || spec.offset > 0 {
334                        Some(PageSpec {
335                            limit: spec.limit,
336                            offset: spec.offset,
337                        })
338                    } else {
339                        None
340                    }
341                }
342                QueryMode::Delete(_) => None,
343            },
344            consistency: self.consistency,
345        };
346        let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
347
348        validate_logical_plan_model(&schema_info, self.model, &plan)?;
349
350        Ok(plan)
351    }
352
353    // Validate pre-plan policy invariants and key-access rules before planning.
354    fn validate_intent(&self) -> Result<(), IntentError> {
355        if self.key_access_conflict {
356            return Err(IntentError::KeyAccessConflict);
357        }
358
359        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
360            .map_err(IntentError::from)?;
361
362        if let Some(state) = &self.key_access {
363            match state.kind {
364                KeyAccessKind::Many if self.predicate.is_some() => {
365                    return Err(IntentError::ByIdsWithPredicate);
366                }
367                KeyAccessKind::Only if self.predicate.is_some() => {
368                    return Err(IntentError::OnlyWithPredicate);
369                }
370                _ => {
371                    // NOTE: Single/Many without predicates impose no additional constraints.
372                }
373            }
374        }
375
376        Ok(())
377    }
378}
379
380///
381/// Query
382///
383/// Typed, declarative query intent for a specific entity type.
384///
385/// This intent is:
386/// - schema-agnostic at construction
387/// - normalized and validated only during planning
388/// - free of access-path decisions
389///
390
391#[derive(Debug)]
392pub struct Query<E: EntityKind> {
393    intent: QueryModel<'static, E::Key>,
394    _marker: PhantomData<E>,
395}
396
397impl<E: EntityKind> Query<E> {
398    /// Create a new intent with an explicit missing-row policy.
399    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
400    /// Use Strict to surface missing rows during scan/delete execution.
401    #[must_use]
402    pub const fn new(consistency: ReadConsistency) -> Self {
403        Self {
404            intent: QueryModel::new(E::MODEL, consistency),
405            _marker: PhantomData,
406        }
407    }
408
409    /// Return the intent mode (load vs delete).
410    #[must_use]
411    pub const fn mode(&self) -> QueryMode {
412        self.intent.mode()
413    }
414
415    #[must_use]
416    pub(crate) fn has_explicit_order(&self) -> bool {
417        self.intent.has_explicit_order()
418    }
419
420    #[must_use]
421    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
422        self.intent.load_spec()
423    }
424
425    /// Add a predicate, implicitly AND-ing with any existing predicate.
426    #[must_use]
427    pub fn filter(mut self, predicate: Predicate) -> Self {
428        self.intent = self.intent.filter(predicate);
429        self
430    }
431
432    /// Apply a dynamic filter expression.
433    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
434        let Self { intent, _marker } = self;
435        let intent = intent.filter_expr(expr)?;
436
437        Ok(Self { intent, _marker })
438    }
439
440    /// Apply a dynamic sort expression.
441    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
442        let Self { intent, _marker } = self;
443        let intent = intent.sort_expr(expr)?;
444
445        Ok(Self { intent, _marker })
446    }
447
448    /// Append an ascending sort key.
449    #[must_use]
450    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
451        self.intent = self.intent.order_by(field);
452        self
453    }
454
455    /// Append a descending sort key.
456    #[must_use]
457    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
458        self.intent = self.intent.order_by_desc(field);
459        self
460    }
461
462    /// Enable DISTINCT semantics for this query.
463    #[must_use]
464    pub fn distinct(mut self) -> Self {
465        self.intent = self.intent.distinct();
466        self
467    }
468
469    /// Set the access path to a single primary key lookup.
470    pub(crate) fn by_id(self, id: E::Key) -> Self {
471        let Self { intent, _marker } = self;
472        Self {
473            intent: intent.by_id(id),
474            _marker,
475        }
476    }
477
478    /// Set the access path to a primary key batch lookup.
479    pub(crate) fn by_ids<I>(self, ids: I) -> Self
480    where
481        I: IntoIterator<Item = E::Key>,
482    {
483        let Self { intent, _marker } = self;
484        Self {
485            intent: intent.by_ids(ids),
486            _marker,
487        }
488    }
489
490    /// Mark this intent as a delete query.
491    #[must_use]
492    pub fn delete(mut self) -> Self {
493        self.intent = self.intent.delete();
494        self
495    }
496
497    /// Apply a limit to the current mode.
498    ///
499    /// Load limits bound result size; delete limits bound mutation size.
500    /// For load queries, any use of `limit` or `offset` requires an explicit
501    /// `order_by(...)` so pagination is deterministic.
502    #[must_use]
503    pub fn limit(mut self, limit: u32) -> Self {
504        self.intent = self.intent.limit(limit);
505        self
506    }
507
508    /// Apply an offset to a load intent.
509    ///
510    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
511    #[must_use]
512    pub fn offset(mut self, offset: u32) -> Self {
513        self.intent = self.intent.offset(offset);
514        self
515    }
516
517    /// Explain this intent without executing it.
518    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
519        let plan = self.build_plan()?;
520
521        Ok(plan.explain_with_model(E::MODEL))
522    }
523
524    /// Plan this intent into an executor-ready plan.
525    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
526        let plan = self.build_plan()?;
527
528        Ok(ExecutablePlan::new(plan))
529    }
530
531    // Build a logical plan for the current intent.
532    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
533        let plan_value = self.intent.build_plan_model()?;
534        let (logical, access) = plan_value.into_parts();
535        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
536        let plan = AccessPlannedQuery::from_parts(logical, access);
537
538        Ok(plan)
539    }
540}
541
542impl<E> Query<E>
543where
544    E: EntityKind + SingletonEntity,
545    E::Key: Default,
546{
547    /// Set the access path to the singleton primary key.
548    pub(crate) fn only(self) -> Self {
549        let Self { intent, _marker } = self;
550
551        Self {
552            intent: intent.only(E::Key::default()),
553            _marker,
554        }
555    }
556}
557
558///
559/// QueryError
560///
561
562#[derive(Debug, ThisError)]
563pub enum QueryError {
564    #[error("{0}")]
565    Validate(#[from] ValidateError),
566
567    #[error("{0}")]
568    Plan(Box<PlanError>),
569
570    #[error("{0}")]
571    Intent(#[from] IntentError),
572
573    #[error("{0}")]
574    Response(#[from] ResponseError),
575
576    #[error("{0}")]
577    Execute(#[from] InternalError),
578}
579
580impl From<PlannerError> for QueryError {
581    fn from(err: PlannerError) -> Self {
582        match err {
583            PlannerError::Plan(err) => Self::from(*err),
584            PlannerError::Internal(err) => Self::Execute(*err),
585        }
586    }
587}
588
589impl From<PlanError> for QueryError {
590    fn from(err: PlanError) -> Self {
591        Self::Plan(Box::new(err))
592    }
593}
594
595///
596/// IntentError
597///
598
599#[derive(Clone, Copy, Debug, ThisError)]
600pub enum IntentError {
601    #[error("{0}")]
602    PlanShape(#[from] policy::PlanPolicyError),
603
604    #[error("by_ids() cannot be combined with predicates")]
605    ByIdsWithPredicate,
606
607    #[error("only() cannot be combined with predicates")]
608    OnlyWithPredicate,
609
610    #[error("multiple key access methods were used on the same query")]
611    KeyAccessConflict,
612
613    #[error("cursor pagination requires an explicit ordering")]
614    CursorRequiresOrder,
615
616    #[error("cursor pagination requires an explicit limit")]
617    CursorRequiresLimit,
618
619    #[error("cursor tokens can only be used with .page().execute()")]
620    CursorRequiresPagedExecution,
621}
622
623impl From<policy::CursorPagingPolicyError> for IntentError {
624    fn from(err: policy::CursorPagingPolicyError) -> Self {
625        match err {
626            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
627            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
628        }
629    }
630}
631
632/// Helper to append an ordering field while preserving existing order spec.
633fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
634    match order {
635        Some(mut spec) => {
636            spec.fields.push((field.to_string(), direction));
637            spec
638        }
639        None => OrderSpec {
640            fields: vec![(field.to_string(), direction)],
641        },
642    }
643}
644
645// Normalize ORDER BY into a canonical, deterministic shape:
646// - preserve user field order
647// - remove explicit primary-key references from the user segment
648// - append exactly one primary-key field as the terminal tie-break
649fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
650    let mut order = order?;
651    if order.fields.is_empty() {
652        return Some(order);
653    }
654
655    let pk_field = model.primary_key.name;
656    let mut pk_direction = None;
657    order.fields.retain(|(field, direction)| {
658        if field == pk_field {
659            if pk_direction.is_none() {
660                pk_direction = Some(*direction);
661            }
662            false
663        } else {
664            true
665        }
666    });
667
668    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
669    order.fields.push((pk_field.to_string(), pk_direction));
670
671    Some(order)
672}