Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10    db::{
11        query::{
12            ReadConsistency,
13            explain::ExplainPlan,
14            expr::{FilterExpr, SortExpr, SortLowerError},
15            plan::{
16                DeleteLimitSpec, ExecutablePlan, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
17                PlanError,
18                planner::{PlannerError, plan_access},
19                validate::validate_logical_plan_model,
20            },
21            policy,
22            predicate::{
23                Predicate, PredicateFieldSlots, SchemaInfo, ValidateError, normalize,
24                normalize_enum_literals, validate::reject_unsupported_query_features,
25            },
26        },
27        response::ResponseError,
28    },
29    error::InternalError,
30    model::entity::EntityModel,
31    traits::{EntityKind, FieldValue, SingletonEntity},
32    value::Value,
33};
34use std::marker::PhantomData;
35use thiserror::Error as ThisError;
36
37///
38/// QueryMode
39/// Discriminates load vs delete intent at planning time.
40/// Encodes mode-specific fields so invalid states are unrepresentable.
41/// Mode checks are explicit and stable at execution time.
42///
43
44#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum QueryMode {
46    Load(LoadSpec),
47    Delete(DeleteSpec),
48}
49
50impl QueryMode {
51    /// True if this mode represents a load intent.
52    #[must_use]
53    pub const fn is_load(&self) -> bool {
54        match self {
55            Self::Load(_) => true,
56            Self::Delete(_) => false,
57        }
58    }
59
60    /// True if this mode represents a delete intent.
61    #[must_use]
62    pub const fn is_delete(&self) -> bool {
63        match self {
64            Self::Delete(_) => true,
65            Self::Load(_) => false,
66        }
67    }
68}
69
70///
71/// LoadSpec
72/// Mode-specific fields for load intents.
73/// Encodes pagination without leaking into delete intents.
74///
75
76#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct LoadSpec {
78    pub limit: Option<u32>,
79    pub offset: u32,
80}
81
82impl LoadSpec {
83    /// Create an empty load spec.
84    #[must_use]
85    pub const fn new() -> Self {
86        Self {
87            limit: None,
88            offset: 0,
89        }
90    }
91}
92
93///
94/// DeleteSpec
95/// Mode-specific fields for delete intents.
96/// Encodes delete limits without leaking into load intents.
97///
98
99#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
100pub struct DeleteSpec {
101    pub limit: Option<u32>,
102}
103
104impl DeleteSpec {
105    /// Create an empty delete spec.
106    #[must_use]
107    pub const fn new() -> Self {
108        Self { limit: None }
109    }
110}
111
112///
113/// QueryModel
114///
115/// Model-level query intent and planning context.
116/// Consumes an `EntityModel` derived from typed entity definitions.
117///
118
119#[derive(Debug)]
120pub(crate) struct QueryModel<'m, K> {
121    model: &'m EntityModel,
122    mode: QueryMode,
123    predicate: Option<Predicate>,
124    key_access: Option<KeyAccessState<K>>,
125    key_access_conflict: bool,
126    order: Option<OrderSpec>,
127    distinct: bool,
128    consistency: ReadConsistency,
129}
130
131impl<'m, K: FieldValue> QueryModel<'m, K> {
132    #[must_use]
133    pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
134        Self {
135            model,
136            mode: QueryMode::Load(LoadSpec::new()),
137            predicate: None,
138            key_access: None,
139            key_access_conflict: false,
140            order: None,
141            distinct: false,
142            consistency,
143        }
144    }
145
146    /// Return the intent mode (load vs delete).
147    #[must_use]
148    pub(crate) const fn mode(&self) -> QueryMode {
149        self.mode
150    }
151
152    #[must_use]
153    fn has_explicit_order(&self) -> bool {
154        policy::has_explicit_order(self.order.as_ref())
155    }
156
157    #[must_use]
158    const fn load_spec(&self) -> Option<LoadSpec> {
159        match self.mode {
160            QueryMode::Load(spec) => Some(spec),
161            QueryMode::Delete(_) => None,
162        }
163    }
164
165    /// Add a predicate, implicitly AND-ing with any existing predicate.
166    #[must_use]
167    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
168        self.predicate = match self.predicate.take() {
169            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
170            None => Some(predicate),
171        };
172        self
173    }
174
175    /// Apply a dynamic filter expression using the model schema.
176    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
177        let schema = SchemaInfo::from_entity_model(self.model)?;
178        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
179
180        Ok(self.filter(predicate))
181    }
182
183    /// Apply a dynamic sort expression using the model schema.
184    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
185        let schema = SchemaInfo::from_entity_model(self.model)?;
186        let order = match expr.lower_with(&schema) {
187            Ok(order) => order,
188            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
189            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
190        };
191
192        policy::validate_order_shape(Some(&order))
193            .map_err(IntentError::from)
194            .map_err(QueryError::from)?;
195
196        Ok(self.order_spec(order))
197    }
198
199    /// Append an ascending sort key.
200    #[must_use]
201    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
202        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
203        self
204    }
205
206    /// Append a descending sort key.
207    #[must_use]
208    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
209        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
210        self
211    }
212
213    /// Set a fully-specified order spec (validated before reaching this boundary).
214    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
215        self.order = Some(order);
216        self
217    }
218
219    /// Enable DISTINCT semantics for this query intent.
220    #[must_use]
221    pub(crate) const fn distinct(mut self) -> Self {
222        self.distinct = true;
223        self
224    }
225
226    /// Track key-only access paths and detect conflicting key intents.
227    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
228        if let Some(existing) = &self.key_access
229            && existing.kind != kind
230        {
231            self.key_access_conflict = true;
232        }
233
234        self.key_access = Some(KeyAccessState { kind, access });
235
236        self
237    }
238
239    /// Set the access path to a single primary key lookup.
240    pub(crate) fn by_id(self, id: K) -> Self {
241        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
242    }
243
244    /// Set the access path to a primary key batch lookup.
245    pub(crate) fn by_ids<I>(self, ids: I) -> Self
246    where
247        I: IntoIterator<Item = K>,
248    {
249        self.set_key_access(
250            KeyAccessKind::Many,
251            KeyAccess::Many(ids.into_iter().collect()),
252        )
253    }
254
255    /// Set the access path to the singleton primary key.
256    pub(crate) fn only(self, id: K) -> Self {
257        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
258    }
259
260    /// Mark this intent as a delete query.
261    #[must_use]
262    pub(crate) const fn delete(mut self) -> Self {
263        if self.mode.is_load() {
264            self.mode = QueryMode::Delete(DeleteSpec::new());
265        }
266        self
267    }
268
269    /// Apply a limit to the current mode.
270    ///
271    /// Load limits bound result size; delete limits bound mutation size.
272    #[must_use]
273    pub(crate) const fn limit(mut self, limit: u32) -> Self {
274        match self.mode {
275            QueryMode::Load(mut spec) => {
276                spec.limit = Some(limit);
277                self.mode = QueryMode::Load(spec);
278            }
279            QueryMode::Delete(mut spec) => {
280                spec.limit = Some(limit);
281                self.mode = QueryMode::Delete(spec);
282            }
283        }
284        self
285    }
286
287    /// Apply an offset to a load intent.
288    #[must_use]
289    pub(crate) const fn offset(mut self, offset: u32) -> Self {
290        if let QueryMode::Load(mut spec) = self.mode {
291            spec.offset = offset;
292            self.mode = QueryMode::Load(spec);
293        }
294        self
295    }
296
297    /// Build a model-level logical plan using Value-based access keys.
298    fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
299        // Phase 1: schema surface and intent validation.
300        let schema_info = SchemaInfo::from_entity_model(self.model)?;
301        self.validate_intent()?;
302
303        // Phase 2: predicate normalization and access planning.
304        let normalized_predicate = self
305            .predicate
306            .as_ref()
307            .map(|predicate| {
308                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
309                let predicate = normalize_enum_literals(&schema_info, predicate)?;
310                Ok::<Predicate, ValidateError>(normalize(&predicate))
311            })
312            .transpose()?;
313        let access_plan_value = match &self.key_access {
314            Some(state) => access_plan_from_keys_value(&state.access),
315            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
316        };
317
318        // Phase 3: assemble the executor-ready plan.
319        let plan = LogicalPlan {
320            mode: self.mode,
321            access: access_plan_value,
322            predicate: normalized_predicate,
323            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
324            // This ensures explain/fingerprint/execution share one deterministic order shape.
325            order: canonicalize_order_spec(self.model, self.order.clone()),
326            distinct: self.distinct,
327            delete_limit: match self.mode {
328                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
329                QueryMode::Load(_) => None,
330            },
331            page: match self.mode {
332                QueryMode::Load(spec) => {
333                    if spec.limit.is_some() || spec.offset > 0 {
334                        Some(PageSpec {
335                            limit: spec.limit,
336                            offset: spec.offset,
337                        })
338                    } else {
339                        None
340                    }
341                }
342                QueryMode::Delete(_) => None,
343            },
344            consistency: self.consistency,
345        };
346
347        validate_logical_plan_model(&schema_info, self.model, &plan)?;
348
349        Ok(plan)
350    }
351
352    // Validate pre-plan policy invariants and key-access rules before planning.
353    fn validate_intent(&self) -> Result<(), IntentError> {
354        if self.key_access_conflict {
355            return Err(IntentError::KeyAccessConflict);
356        }
357
358        policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
359            .map_err(IntentError::from)?;
360
361        if let Some(state) = &self.key_access {
362            match state.kind {
363                KeyAccessKind::Many if self.predicate.is_some() => {
364                    return Err(IntentError::ByIdsWithPredicate);
365                }
366                KeyAccessKind::Only if self.predicate.is_some() => {
367                    return Err(IntentError::OnlyWithPredicate);
368                }
369                _ => {
370                    // NOTE: Single/Many without predicates impose no additional constraints.
371                }
372            }
373        }
374
375        Ok(())
376    }
377}
378
379///
380/// Query
381///
382/// Typed, declarative query intent for a specific entity type.
383///
384/// This intent is:
385/// - schema-agnostic at construction
386/// - normalized and validated only during planning
387/// - free of access-path decisions
388///
389
390#[derive(Debug)]
391pub struct Query<E: EntityKind> {
392    intent: QueryModel<'static, E::Key>,
393    _marker: PhantomData<E>,
394}
395
396impl<E: EntityKind> Query<E> {
397    /// Create a new intent with an explicit missing-row policy.
398    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
399    /// Use Strict to surface missing rows during scan/delete execution.
400    #[must_use]
401    pub const fn new(consistency: ReadConsistency) -> Self {
402        Self {
403            intent: QueryModel::new(E::MODEL, consistency),
404            _marker: PhantomData,
405        }
406    }
407
408    /// Return the intent mode (load vs delete).
409    #[must_use]
410    pub const fn mode(&self) -> QueryMode {
411        self.intent.mode()
412    }
413
414    #[must_use]
415    pub(crate) fn has_explicit_order(&self) -> bool {
416        self.intent.has_explicit_order()
417    }
418
419    #[must_use]
420    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
421        self.intent.load_spec()
422    }
423
424    /// Add a predicate, implicitly AND-ing with any existing predicate.
425    #[must_use]
426    pub fn filter(mut self, predicate: Predicate) -> Self {
427        self.intent = self.intent.filter(predicate);
428        self
429    }
430
431    /// Apply a dynamic filter expression.
432    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
433        let Self { intent, _marker } = self;
434        let intent = intent.filter_expr(expr)?;
435
436        Ok(Self { intent, _marker })
437    }
438
439    /// Apply a dynamic sort expression.
440    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
441        let Self { intent, _marker } = self;
442        let intent = intent.sort_expr(expr)?;
443
444        Ok(Self { intent, _marker })
445    }
446
447    /// Append an ascending sort key.
448    #[must_use]
449    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
450        self.intent = self.intent.order_by(field);
451        self
452    }
453
454    /// Append a descending sort key.
455    #[must_use]
456    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
457        self.intent = self.intent.order_by_desc(field);
458        self
459    }
460
461    /// Enable DISTINCT semantics for this query.
462    #[must_use]
463    pub fn distinct(mut self) -> Self {
464        self.intent = self.intent.distinct();
465        self
466    }
467
468    /// Set the access path to a single primary key lookup.
469    pub(crate) fn by_id(self, id: E::Key) -> Self {
470        let Self { intent, _marker } = self;
471        Self {
472            intent: intent.by_id(id),
473            _marker,
474        }
475    }
476
477    /// Set the access path to a primary key batch lookup.
478    pub(crate) fn by_ids<I>(self, ids: I) -> Self
479    where
480        I: IntoIterator<Item = E::Key>,
481    {
482        let Self { intent, _marker } = self;
483        Self {
484            intent: intent.by_ids(ids),
485            _marker,
486        }
487    }
488
489    /// Mark this intent as a delete query.
490    #[must_use]
491    pub fn delete(mut self) -> Self {
492        self.intent = self.intent.delete();
493        self
494    }
495
496    /// Apply a limit to the current mode.
497    ///
498    /// Load limits bound result size; delete limits bound mutation size.
499    /// For load queries, any use of `limit` or `offset` requires an explicit
500    /// `order_by(...)` so pagination is deterministic.
501    #[must_use]
502    pub fn limit(mut self, limit: u32) -> Self {
503        self.intent = self.intent.limit(limit);
504        self
505    }
506
507    /// Apply an offset to a load intent.
508    ///
509    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
510    #[must_use]
511    pub fn offset(mut self, offset: u32) -> Self {
512        self.intent = self.intent.offset(offset);
513        self
514    }
515
516    /// Explain this intent without executing it.
517    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
518        let plan = self.build_plan()?;
519
520        Ok(plan.explain_with_model(E::MODEL))
521    }
522
523    /// Plan this intent into an executor-ready plan.
524    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
525        let plan = self.build_plan()?;
526        let predicate_slots = plan
527            .predicate
528            .as_ref()
529            .map(PredicateFieldSlots::resolve::<E>);
530
531        Ok(ExecutablePlan::new_with_compiled_predicate_slots(
532            plan,
533            predicate_slots,
534        ))
535    }
536
537    // Build a logical plan for the current intent.
538    fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
539        let plan_value = self.intent.build_plan_model()?;
540        let LogicalPlan {
541            mode,
542            access,
543            predicate,
544            order,
545            distinct,
546            delete_limit,
547            page,
548            consistency,
549        } = plan_value;
550
551        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
552        let plan = LogicalPlan {
553            mode,
554            access,
555            predicate,
556            order,
557            distinct,
558            delete_limit,
559            page,
560            consistency,
561        };
562
563        Ok(plan)
564    }
565}
566
567impl<E> Query<E>
568where
569    E: EntityKind + SingletonEntity,
570    E::Key: Default,
571{
572    /// Set the access path to the singleton primary key.
573    pub(crate) fn only(self) -> Self {
574        let Self { intent, _marker } = self;
575
576        Self {
577            intent: intent.only(E::Key::default()),
578            _marker,
579        }
580    }
581}
582
583///
584/// QueryError
585///
586
587#[derive(Debug, ThisError)]
588pub enum QueryError {
589    #[error("{0}")]
590    Validate(#[from] ValidateError),
591
592    #[error("{0}")]
593    Plan(Box<PlanError>),
594
595    #[error("{0}")]
596    Intent(#[from] IntentError),
597
598    #[error("{0}")]
599    Response(#[from] ResponseError),
600
601    #[error("{0}")]
602    Execute(#[from] InternalError),
603}
604
605impl From<PlannerError> for QueryError {
606    fn from(err: PlannerError) -> Self {
607        match err {
608            PlannerError::Plan(err) => Self::from(*err),
609            PlannerError::Internal(err) => Self::Execute(*err),
610        }
611    }
612}
613
614impl From<PlanError> for QueryError {
615    fn from(err: PlanError) -> Self {
616        Self::Plan(Box::new(err))
617    }
618}
619
620///
621/// IntentError
622///
623
624#[derive(Clone, Copy, Debug, ThisError)]
625pub enum IntentError {
626    #[error("{0}")]
627    PlanShape(#[from] policy::PlanPolicyError),
628
629    #[error("by_ids() cannot be combined with predicates")]
630    ByIdsWithPredicate,
631
632    #[error("only() cannot be combined with predicates")]
633    OnlyWithPredicate,
634
635    #[error("multiple key access methods were used on the same query")]
636    KeyAccessConflict,
637
638    #[error("cursor pagination requires an explicit ordering")]
639    CursorRequiresOrder,
640
641    #[error("cursor pagination requires an explicit limit")]
642    CursorRequiresLimit,
643
644    #[error("cursor tokens can only be used with .page().execute()")]
645    CursorRequiresPagedExecution,
646}
647
648impl From<policy::CursorPagingPolicyError> for IntentError {
649    fn from(err: policy::CursorPagingPolicyError) -> Self {
650        match err {
651            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
652            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
653        }
654    }
655}
656
657/// Helper to append an ordering field while preserving existing order spec.
658fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
659    match order {
660        Some(mut spec) => {
661            spec.fields.push((field.to_string(), direction));
662            spec
663        }
664        None => OrderSpec {
665            fields: vec![(field.to_string(), direction)],
666        },
667    }
668}
669
670// Normalize ORDER BY into a canonical, deterministic shape:
671// - preserve user field order
672// - remove explicit primary-key references from the user segment
673// - append exactly one primary-key field as the terminal tie-break
674fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
675    let mut order = order?;
676    if order.fields.is_empty() {
677        return Some(order);
678    }
679
680    let pk_field = model.primary_key.name;
681    let mut pk_direction = None;
682    order.fields.retain(|(field, direction)| {
683        if field == pk_field {
684            if pk_direction.is_none() {
685                pk_direction = Some(*direction);
686            }
687            false
688        } else {
689            true
690        }
691    });
692
693    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
694    order.fields.push((pk_field.to_string(), pk_direction));
695
696    Some(order)
697}