Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10    db::{
11        query::{
12            ReadConsistency,
13            expr::{FilterExpr, SortExpr, SortLowerError},
14            plan::{
15                DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16                OrderSpec, PageSpec, PlanError,
17                planner::{PlannerError, plan_access},
18                validate::validate_logical_plan_model,
19            },
20            policy,
21            predicate::{
22                Predicate, PredicateFieldSlots, SchemaInfo, ValidateError, normalize,
23                normalize_enum_literals, validate::reject_unsupported_query_features,
24            },
25        },
26        response::ResponseError,
27    },
28    error::InternalError,
29    model::entity::EntityModel,
30    traits::{EntityKind, FieldValue, SingletonEntity},
31    value::Value,
32};
33use std::marker::PhantomData;
34use thiserror::Error as ThisError;
35
36///
37/// QueryMode
38/// Discriminates load vs delete intent at planning time.
39/// Encodes mode-specific fields so invalid states are unrepresentable.
40/// Mode checks are explicit and stable at execution time.
41///
42
43#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub enum QueryMode {
45    Load(LoadSpec),
46    Delete(DeleteSpec),
47}
48
49impl QueryMode {
50    /// True if this mode represents a load intent.
51    #[must_use]
52    pub const fn is_load(&self) -> bool {
53        match self {
54            Self::Load(_) => true,
55            Self::Delete(_) => false,
56        }
57    }
58
59    /// True if this mode represents a delete intent.
60    #[must_use]
61    pub const fn is_delete(&self) -> bool {
62        match self {
63            Self::Delete(_) => true,
64            Self::Load(_) => false,
65        }
66    }
67}
68
69///
70/// LoadSpec
71/// Mode-specific fields for load intents.
72/// Encodes pagination without leaking into delete intents.
73///
74
75#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
76pub struct LoadSpec {
77    pub limit: Option<u32>,
78    pub offset: u32,
79}
80
81impl LoadSpec {
82    /// Create an empty load spec.
83    #[must_use]
84    pub const fn new() -> Self {
85        Self {
86            limit: None,
87            offset: 0,
88        }
89    }
90}
91
92///
93/// DeleteSpec
94/// Mode-specific fields for delete intents.
95/// Encodes delete limits without leaking into load intents.
96///
97
98#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub struct DeleteSpec {
100    pub limit: Option<u32>,
101}
102
103impl DeleteSpec {
104    /// Create an empty delete spec.
105    #[must_use]
106    pub const fn new() -> Self {
107        Self { limit: None }
108    }
109}
110
111///
112/// QueryModel
113///
114/// Model-level query intent and planning context.
115/// Consumes an `EntityModel` derived from typed entity definitions.
116///
117
118#[derive(Debug)]
119pub(crate) struct QueryModel<'m, K> {
120    model: &'m EntityModel,
121    mode: QueryMode,
122    predicate: Option<Predicate>,
123    key_access: Option<KeyAccessState<K>>,
124    key_access_conflict: bool,
125    order: Option<OrderSpec>,
126    distinct: bool,
127    consistency: ReadConsistency,
128}
129
130impl<'m, K: FieldValue> QueryModel<'m, K> {
131    #[must_use]
132    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
133        Self {
134            model,
135            mode: QueryMode::Load(LoadSpec::new()),
136            predicate: None,
137            key_access: None,
138            key_access_conflict: false,
139            order: None,
140            distinct: false,
141            consistency,
142        }
143    }
144
145    /// Return the intent mode (load vs delete).
146    #[must_use]
147    pub(crate) const fn mode(&self) -> QueryMode {
148        self.mode
149    }
150
151    #[must_use]
152    fn has_explicit_order(&self) -> bool {
153        policy::has_explicit_order(self.order.as_ref())
154    }
155
156    #[must_use]
157    const fn load_spec(&self) -> Option<LoadSpec> {
158        match self.mode {
159            QueryMode::Load(spec) => Some(spec),
160            QueryMode::Delete(_) => None,
161        }
162    }
163
164    /// Add a predicate, implicitly AND-ing with any existing predicate.
165    #[must_use]
166    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
167        self.predicate = match self.predicate.take() {
168            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169            None => Some(predicate),
170        };
171        self
172    }
173
174    /// Apply a dynamic filter expression using the model schema.
175    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176        let schema = SchemaInfo::from_entity_model(self.model)?;
177        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179        Ok(self.filter(predicate))
180    }
181
182    /// Apply a dynamic sort expression using the model schema.
183    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184        let schema = SchemaInfo::from_entity_model(self.model)?;
185        let order = match expr.lower_with(&schema) {
186            Ok(order) => order,
187            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189        };
190
191        policy::validate_order_shape(Some(&order))
192            .map_err(IntentError::from)
193            .map_err(QueryError::from)?;
194
195        Ok(self.order_spec(order))
196    }
197
198    /// Append an ascending sort key.
199    #[must_use]
200    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
201        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202        self
203    }
204
205    /// Append a descending sort key.
206    #[must_use]
207    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209        self
210    }
211
212    /// Set a fully-specified order spec (validated before reaching this boundary).
213    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214        self.order = Some(order);
215        self
216    }
217
218    /// Enable DISTINCT semantics for this query intent.
219    #[must_use]
220    pub(crate) const fn distinct(mut self) -> Self {
221        self.distinct = true;
222        self
223    }
224
225    /// Track key-only access paths and detect conflicting key intents.
226    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
227        if let Some(existing) = &self.key_access
228            && existing.kind != kind
229        {
230            self.key_access_conflict = true;
231        }
232
233        self.key_access = Some(KeyAccessState { kind, access });
234
235        self
236    }
237
238    /// Set the access path to a single primary key lookup.
239    pub(crate) fn by_id(self, id: K) -> Self {
240        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
241    }
242
243    /// Set the access path to a primary key batch lookup.
244    pub(crate) fn by_ids<I>(self, ids: I) -> Self
245    where
246        I: IntoIterator<Item = K>,
247    {
248        self.set_key_access(
249            KeyAccessKind::Many,
250            KeyAccess::Many(ids.into_iter().collect()),
251        )
252    }
253
254    /// Set the access path to the singleton primary key.
255    pub(crate) fn only(self, id: K) -> Self {
256        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
257    }
258
259    /// Mark this intent as a delete query.
260    #[must_use]
261    pub(crate) const fn delete(mut self) -> Self {
262        if self.mode.is_load() {
263            self.mode = QueryMode::Delete(DeleteSpec::new());
264        }
265        self
266    }
267
268    /// Apply a limit to the current mode.
269    ///
270    /// Load limits bound result size; delete limits bound mutation size.
271    #[must_use]
272    pub(crate) const fn limit(mut self, limit: u32) -> Self {
273        match self.mode {
274            QueryMode::Load(mut spec) => {
275                spec.limit = Some(limit);
276                self.mode = QueryMode::Load(spec);
277            }
278            QueryMode::Delete(mut spec) => {
279                spec.limit = Some(limit);
280                self.mode = QueryMode::Delete(spec);
281            }
282        }
283        self
284    }
285
286    /// Apply an offset to a load intent.
287    #[must_use]
288    pub(crate) const fn offset(mut self, offset: u32) -> Self {
289        if let QueryMode::Load(mut spec) = self.mode {
290            spec.offset = offset;
291            self.mode = QueryMode::Load(spec);
292        }
293        self
294    }
295
296    /// Build a model-level logical plan using Value-based access keys.
297    fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
298        // Phase 1: schema surface and intent validation.
299        let schema_info = SchemaInfo::from_entity_model(self.model)?;
300        self.validate_intent()?;
301
302        // Phase 2: predicate normalization and access planning.
303        let normalized_predicate = self
304            .predicate
305            .as_ref()
306            .map(|predicate| {
307                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
308                let predicate = normalize_enum_literals(&schema_info, predicate)?;
309                Ok::<Predicate, ValidateError>(normalize(&predicate))
310            })
311            .transpose()?;
312        let access_plan_value = match &self.key_access {
313            Some(state) => access_plan_from_keys_value(&state.access),
314            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
315        };
316
317        // Phase 3: assemble the executor-ready plan.
318        let plan = LogicalPlan {
319            mode: self.mode,
320            access: access_plan_value,
321            predicate: normalized_predicate,
322            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
323            // This ensures explain/fingerprint/execution share one deterministic order shape.
324            order: canonicalize_order_spec(self.model, self.order.clone()),
325            distinct: self.distinct,
326            delete_limit: match self.mode {
327                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
328                QueryMode::Load(_) => None,
329            },
330            page: match self.mode {
331                QueryMode::Load(spec) => {
332                    if spec.limit.is_some() || spec.offset > 0 {
333                        Some(PageSpec {
334                            limit: spec.limit,
335                            offset: spec.offset,
336                        })
337                    } else {
338                        None
339                    }
340                }
341                QueryMode::Delete(_) => None,
342            },
343            consistency: self.consistency,
344        };
345
346        validate_logical_plan_model(&schema_info, self.model, &plan)?;
347
348        Ok(plan)
349    }
350
351    // Validate pre-plan policy invariants and key-access rules before planning.
352    fn validate_intent(&self) -> Result<(), IntentError> {
353        if self.key_access_conflict {
354            return Err(IntentError::KeyAccessConflict);
355        }
356
357        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
358            .map_err(IntentError::from)?;
359
360        if let Some(state) = &self.key_access {
361            match state.kind {
362                KeyAccessKind::Many if self.predicate.is_some() => {
363                    return Err(IntentError::ByIdsWithPredicate);
364                }
365                KeyAccessKind::Only if self.predicate.is_some() => {
366                    return Err(IntentError::OnlyWithPredicate);
367                }
368                _ => {
369                    // NOTE: Single/Many without predicates impose no additional constraints.
370                }
371            }
372        }
373
374        Ok(())
375    }
376}
377
378///
379/// Query
380///
381/// Typed, declarative query intent for a specific entity type.
382///
383/// This intent is:
384/// - schema-agnostic at construction
385/// - normalized and validated only during planning
386/// - free of access-path decisions
387///
388
389#[derive(Debug)]
390pub struct Query<E: EntityKind> {
391    intent: QueryModel<'static, E::Key>,
392    _marker: PhantomData<E>,
393}
394
395impl<E: EntityKind> Query<E> {
396    /// Create a new intent with an explicit missing-row policy.
397    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
398    /// Use Strict to surface missing rows during scan/delete execution.
399    #[must_use]
400    pub const fn new(consistency: ReadConsistency) -> Self {
401        Self {
402            intent: QueryModel::new(E::MODEL, consistency),
403            _marker: PhantomData,
404        }
405    }
406
407    /// Return the intent mode (load vs delete).
408    #[must_use]
409    pub const fn mode(&self) -> QueryMode {
410        self.intent.mode()
411    }
412
413    #[must_use]
414    pub(crate) fn has_explicit_order(&self) -> bool {
415        self.intent.has_explicit_order()
416    }
417
418    #[must_use]
419    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
420        self.intent.load_spec()
421    }
422
423    /// Add a predicate, implicitly AND-ing with any existing predicate.
424    #[must_use]
425    pub fn filter(mut self, predicate: Predicate) -> Self {
426        self.intent = self.intent.filter(predicate);
427        self
428    }
429
430    /// Apply a dynamic filter expression.
431    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
432        let Self { intent, _marker } = self;
433        let intent = intent.filter_expr(expr)?;
434
435        Ok(Self { intent, _marker })
436    }
437
438    /// Apply a dynamic sort expression.
439    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
440        let Self { intent, _marker } = self;
441        let intent = intent.sort_expr(expr)?;
442
443        Ok(Self { intent, _marker })
444    }
445
446    /// Append an ascending sort key.
447    #[must_use]
448    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
449        self.intent = self.intent.order_by(field);
450        self
451    }
452
453    /// Append a descending sort key.
454    #[must_use]
455    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
456        self.intent = self.intent.order_by_desc(field);
457        self
458    }
459
460    /// Enable DISTINCT semantics for this query.
461    #[must_use]
462    pub fn distinct(mut self) -> Self {
463        self.intent = self.intent.distinct();
464        self
465    }
466
467    /// Set the access path to a single primary key lookup.
468    pub(crate) fn by_id(self, id: E::Key) -> Self {
469        let Self { intent, _marker } = self;
470        Self {
471            intent: intent.by_id(id),
472            _marker,
473        }
474    }
475
476    /// Set the access path to a primary key batch lookup.
477    pub(crate) fn by_ids<I>(self, ids: I) -> Self
478    where
479        I: IntoIterator<Item = E::Key>,
480    {
481        let Self { intent, _marker } = self;
482        Self {
483            intent: intent.by_ids(ids),
484            _marker,
485        }
486    }
487
488    /// Mark this intent as a delete query.
489    #[must_use]
490    pub fn delete(mut self) -> Self {
491        self.intent = self.intent.delete();
492        self
493    }
494
495    /// Apply a limit to the current mode.
496    ///
497    /// Load limits bound result size; delete limits bound mutation size.
498    /// For load queries, any use of `limit` or `offset` requires an explicit
499    /// `order_by(...)` so pagination is deterministic.
500    #[must_use]
501    pub fn limit(mut self, limit: u32) -> Self {
502        self.intent = self.intent.limit(limit);
503        self
504    }
505
506    /// Apply an offset to a load intent.
507    ///
508    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
509    #[must_use]
510    pub fn offset(mut self, offset: u32) -> Self {
511        self.intent = self.intent.offset(offset);
512        self
513    }
514
515    /// Explain this intent without executing it.
516    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
517        let plan = self.build_plan()?;
518
519        Ok(plan.explain_with_model(E::MODEL))
520    }
521
522    /// Plan this intent into an executor-ready plan.
523    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
524        let plan = self.build_plan()?;
525        let predicate_slots = plan
526            .predicate
527            .as_ref()
528            .map(PredicateFieldSlots::resolve::<E>);
529
530        Ok(ExecutablePlan::new_with_compiled_predicate_slots(
531            plan,
532            predicate_slots,
533        ))
534    }
535
536    // Build a logical plan for the current intent.
537    fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
538        let plan_value = self.intent.build_plan_model()?;
539        let LogicalPlan {
540            mode,
541            access,
542            predicate,
543            order,
544            distinct,
545            delete_limit,
546            page,
547            consistency,
548        } = plan_value;
549
550        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
551        let plan = LogicalPlan {
552            mode,
553            access,
554            predicate,
555            order,
556            distinct,
557            delete_limit,
558            page,
559            consistency,
560        };
561
562        Ok(plan)
563    }
564}
565
566impl<E> Query<E>
567where
568    E: EntityKind + SingletonEntity,
569    E::Key: Default,
570{
571    /// Set the access path to the singleton primary key.
572    pub(crate) fn only(self) -> Self {
573        let Self { intent, _marker } = self;
574
575        Self {
576            intent: intent.only(E::Key::default()),
577            _marker,
578        }
579    }
580}
581
582///
583/// QueryError
584///
585
586#[derive(Debug, ThisError)]
587pub enum QueryError {
588    #[error("{0}")]
589    Validate(#[from] ValidateError),
590
591    #[error("{0}")]
592    Plan(Box<PlanError>),
593
594    #[error("{0}")]
595    Intent(#[from] IntentError),
596
597    #[error("{0}")]
598    Response(#[from] ResponseError),
599
600    #[error("{0}")]
601    Execute(#[from] InternalError),
602}
603
604impl From<PlannerError> for QueryError {
605    fn from(err: PlannerError) -> Self {
606        match err {
607            PlannerError::Plan(err) => Self::from(*err),
608            PlannerError::Internal(err) => Self::Execute(*err),
609        }
610    }
611}
612
613impl From<PlanError> for QueryError {
614    fn from(err: PlanError) -> Self {
615        Self::Plan(Box::new(err))
616    }
617}
618
619///
620/// IntentError
621///
622
623#[derive(Clone, Copy, Debug, ThisError)]
624pub enum IntentError {
625    #[error("{0}")]
626    PlanShape(#[from] policy::PlanPolicyError),
627
628    #[error("by_ids() cannot be combined with predicates")]
629    ByIdsWithPredicate,
630
631    #[error("only() cannot be combined with predicates")]
632    OnlyWithPredicate,
633
634    #[error("multiple key access methods were used on the same query")]
635    KeyAccessConflict,
636
637    #[error("cursor pagination requires an explicit ordering")]
638    CursorRequiresOrder,
639
640    #[error("cursor pagination requires an explicit limit")]
641    CursorRequiresLimit,
642
643    #[error("cursor tokens can only be used with .page().execute()")]
644    CursorRequiresPagedExecution,
645}
646
647impl From<policy::CursorPagingPolicyError> for IntentError {
648    fn from(err: policy::CursorPagingPolicyError) -> Self {
649        match err {
650            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
651            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
652        }
653    }
654}
655
656/// Helper to append an ordering field while preserving existing order spec.
657fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
658    match order {
659        Some(mut spec) => {
660            spec.fields.push((field.to_string(), direction));
661            spec
662        }
663        None => OrderSpec {
664            fields: vec![(field.to_string(), direction)],
665        },
666    }
667}
668
669// Normalize ORDER BY into a canonical, deterministic shape:
670// - preserve user field order
671// - remove explicit primary-key references from the user segment
672// - append exactly one primary-key field as the terminal tie-break
673fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
674    let mut order = order?;
675    if order.fields.is_empty() {
676        return Some(order);
677    }
678
679    let pk_field = model.primary_key.name;
680    let mut pk_direction = None;
681    order.fields.retain(|(field, direction)| {
682        if field == pk_field {
683            if pk_direction.is_none() {
684                pk_direction = Some(*direction);
685            }
686            false
687        } else {
688            true
689        }
690    });
691
692    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
693    order.fields.push((pk_field.to_string(), pk_direction));
694
695    Some(order)
696}