Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5// Key-only access intent and helpers (split out for readability).
6mod key_access;
7pub use key_access::*;
8
9use crate::{
10    db::{
11        query::{
12            ReadConsistency,
13            expr::{FilterExpr, SortExpr, SortLowerError},
14            plan::{
15                DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16                OrderSpec, PageSpec, PlanError,
17                planner::{PlannerError, plan_access},
18                validate::validate_logical_plan_model,
19            },
20            policy,
21            predicate::{
22                Predicate, SchemaInfo, ValidateError, normalize,
23                validate::reject_unsupported_query_features,
24            },
25        },
26        response::ResponseError,
27    },
28    error::InternalError,
29    traits::{EntityKind, FieldValue, SingletonEntity},
30    value::Value,
31};
32use std::marker::PhantomData;
33use thiserror::Error as ThisError;
34
35///
36/// QueryMode
37/// Discriminates load vs delete intent at planning time.
38/// Encodes mode-specific fields so invalid states are unrepresentable.
39/// Mode checks are explicit and stable at execution time.
40///
41
42#[derive(Clone, Copy, Debug, Eq, PartialEq)]
43pub enum QueryMode {
44    Load(LoadSpec),
45    Delete(DeleteSpec),
46}
47
48impl QueryMode {
49    /// True if this mode represents a load intent.
50    #[must_use]
51    pub const fn is_load(&self) -> bool {
52        match self {
53            Self::Load(_) => true,
54            Self::Delete(_) => false,
55        }
56    }
57
58    /// True if this mode represents a delete intent.
59    #[must_use]
60    pub const fn is_delete(&self) -> bool {
61        match self {
62            Self::Delete(_) => true,
63            Self::Load(_) => false,
64        }
65    }
66}
67
68///
69/// LoadSpec
70/// Mode-specific fields for load intents.
71/// Encodes pagination without leaking into delete intents.
72///
73
74#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
75pub struct LoadSpec {
76    pub limit: Option<u32>,
77    pub offset: u32,
78}
79
80impl LoadSpec {
81    /// Create an empty load spec.
82    #[must_use]
83    pub const fn new() -> Self {
84        Self {
85            limit: None,
86            offset: 0,
87        }
88    }
89}
90
91///
92/// DeleteSpec
93/// Mode-specific fields for delete intents.
94/// Encodes delete limits without leaking into load intents.
95///
96
97#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub struct DeleteSpec {
99    pub limit: Option<u32>,
100}
101
102impl DeleteSpec {
103    /// Create an empty delete spec.
104    #[must_use]
105    pub const fn new() -> Self {
106        Self { limit: None }
107    }
108}
109
110///
111/// QueryModel
112///
113/// Model-level query intent and planning context.
114/// Consumes an `EntityModel` derived from typed entity definitions.
115///
116
117#[derive(Debug)]
118pub(crate) struct QueryModel<'m, K> {
119    model: &'m crate::model::entity::EntityModel,
120    mode: QueryMode,
121    predicate: Option<Predicate>,
122    key_access: Option<KeyAccessState<K>>,
123    key_access_conflict: bool,
124    order: Option<OrderSpec>,
125    consistency: ReadConsistency,
126}
127
128impl<'m, K: FieldValue> QueryModel<'m, K> {
129    #[must_use]
130    pub const fn new(
131        model: &'m crate::model::entity::EntityModel,
132        consistency: ReadConsistency,
133    ) -> Self {
134        Self {
135            model,
136            mode: QueryMode::Load(LoadSpec::new()),
137            predicate: None,
138            key_access: None,
139            key_access_conflict: false,
140            order: None,
141            consistency,
142        }
143    }
144
145    /// Return the intent mode (load vs delete).
146    #[must_use]
147    pub const fn mode(&self) -> QueryMode {
148        self.mode
149    }
150
151    #[must_use]
152    fn has_explicit_order(&self) -> bool {
153        policy::has_explicit_order(self.order.as_ref())
154    }
155
156    #[must_use]
157    const fn load_spec(&self) -> Option<LoadSpec> {
158        match self.mode {
159            QueryMode::Load(spec) => Some(spec),
160            QueryMode::Delete(_) => None,
161        }
162    }
163
164    /// Add a predicate, implicitly AND-ing with any existing predicate.
165    #[must_use]
166    pub fn filter(mut self, predicate: Predicate) -> Self {
167        self.predicate = match self.predicate.take() {
168            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169            None => Some(predicate),
170        };
171        self
172    }
173
174    /// Apply a dynamic filter expression using the model schema.
175    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176        let schema = SchemaInfo::from_entity_model(self.model)?;
177        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179        Ok(self.filter(predicate))
180    }
181
182    /// Apply a dynamic sort expression using the model schema.
183    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184        let schema = SchemaInfo::from_entity_model(self.model)?;
185        let order = match expr.lower_with(&schema) {
186            Ok(order) => order,
187            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189        };
190
191        if order.fields.is_empty() {
192            return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
193        }
194
195        Ok(self.order_spec(order))
196    }
197
198    /// Append an ascending sort key.
199    #[must_use]
200    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
201        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202        self
203    }
204
205    /// Append a descending sort key.
206    #[must_use]
207    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209        self
210    }
211
212    /// Set a fully-specified order spec (validated before reaching this boundary).
213    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214        self.order = Some(order);
215        self
216    }
217
218    /// Track key-only access paths and detect conflicting key intents.
219    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
220        if let Some(existing) = &self.key_access
221            && existing.kind != kind
222        {
223            self.key_access_conflict = true;
224        }
225
226        self.key_access = Some(KeyAccessState { kind, access });
227
228        self
229    }
230
231    /// Set the access path to a single primary key lookup.
232    pub(crate) fn by_id(self, id: K) -> Self {
233        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
234    }
235
236    /// Set the access path to a primary key batch lookup.
237    pub(crate) fn by_ids<I>(self, ids: I) -> Self
238    where
239        I: IntoIterator<Item = K>,
240    {
241        self.set_key_access(
242            KeyAccessKind::Many,
243            KeyAccess::Many(ids.into_iter().collect()),
244        )
245    }
246
247    /// Set the access path to the singleton primary key.
248    pub(crate) fn only(self, id: K) -> Self {
249        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
250    }
251
252    /// Mark this intent as a delete query.
253    #[must_use]
254    pub const fn delete(mut self) -> Self {
255        if self.mode.is_load() {
256            self.mode = QueryMode::Delete(DeleteSpec::new());
257        }
258        self
259    }
260
261    /// Apply a limit to the current mode.
262    ///
263    /// Load limits bound result size; delete limits bound mutation size.
264    #[must_use]
265    pub const fn limit(mut self, limit: u32) -> Self {
266        match self.mode {
267            QueryMode::Load(mut spec) => {
268                spec.limit = Some(limit);
269                self.mode = QueryMode::Load(spec);
270            }
271            QueryMode::Delete(mut spec) => {
272                spec.limit = Some(limit);
273                self.mode = QueryMode::Delete(spec);
274            }
275        }
276        self
277    }
278
279    /// Apply an offset to a load intent.
280    #[must_use]
281    pub const fn offset(mut self, offset: u32) -> Self {
282        if let QueryMode::Load(mut spec) = self.mode {
283            spec.offset = offset;
284            self.mode = QueryMode::Load(spec);
285        }
286        self
287    }
288
289    /// Build a model-level logical plan using Value-based access keys.
290    fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
291        // Phase 1: schema surface and intent validation.
292        let schema_info = SchemaInfo::from_entity_model(self.model)?;
293        self.validate_intent()?;
294
295        if let Some(predicate) = self.predicate.as_ref() {
296            reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
297        }
298
299        // Phase 2: predicate normalization and access planning.
300        let normalized_predicate = self.predicate.as_ref().map(normalize);
301        let access_plan_value = match &self.key_access {
302            Some(state) => access_plan_from_keys_value(&state.access),
303            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
304        };
305
306        // Phase 3: assemble the executor-ready plan.
307        let plan = LogicalPlan {
308            mode: self.mode,
309            access: access_plan_value,
310            predicate: normalized_predicate,
311            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
312            // This ensures explain/fingerprint/execution share one deterministic order shape.
313            order: canonicalize_order_spec(self.model, self.order.clone()),
314            delete_limit: match self.mode {
315                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
316                QueryMode::Load(_) => None,
317            },
318            page: match self.mode {
319                QueryMode::Load(spec) => {
320                    if spec.limit.is_some() || spec.offset > 0 {
321                        Some(PageSpec {
322                            limit: spec.limit,
323                            offset: spec.offset,
324                        })
325                    } else {
326                        None
327                    }
328                }
329                QueryMode::Delete(_) => None,
330            },
331            consistency: self.consistency,
332        };
333
334        validate_logical_plan_model(&schema_info, self.model, &plan)?;
335
336        Ok(plan)
337    }
338
339    // Validate delete-specific intent rules before planning.
340    fn validate_intent(&self) -> Result<(), IntentError> {
341        if self.key_access_conflict {
342            return Err(IntentError::KeyAccessConflict);
343        }
344
345        if policy::has_empty_order(self.order.as_ref()) {
346            return Err(IntentError::EmptyOrderSpec);
347        }
348
349        if let Some(state) = &self.key_access {
350            match state.kind {
351                KeyAccessKind::Many if self.predicate.is_some() => {
352                    return Err(IntentError::ByIdsWithPredicate);
353                }
354                KeyAccessKind::Only if self.predicate.is_some() => {
355                    return Err(IntentError::OnlyWithPredicate);
356                }
357                _ => {
358                    // NOTE: Single/Many without predicates impose no additional constraints.
359                }
360            }
361        }
362
363        match self.mode {
364            QueryMode::Load(_) => {}
365            QueryMode::Delete(spec) => {
366                if spec.limit.is_some() && !policy::has_explicit_order(self.order.as_ref()) {
367                    return Err(IntentError::DeleteLimitRequiresOrder);
368                }
369            }
370        }
371
372        Ok(())
373    }
374}
375
376///
377/// Query
378///
379/// Typed, declarative query intent for a specific entity type.
380///
381/// This intent is:
382/// - schema-agnostic at construction
383/// - normalized and validated only during planning
384/// - free of access-path decisions
385///
386
387#[derive(Debug)]
388pub struct Query<E: EntityKind> {
389    intent: QueryModel<'static, E::Key>,
390    #[allow(clippy::struct_field_names)]
391    _marker: PhantomData<E>,
392}
393
394impl<E: EntityKind> Query<E> {
395    /// Create a new intent with an explicit missing-row policy.
396    /// MissingOk favors idempotency and may mask index/data divergence on deletes.
397    /// Use Strict to surface missing rows during scan/delete execution.
398    #[must_use]
399    pub const fn new(consistency: ReadConsistency) -> Self {
400        Self {
401            intent: QueryModel::new(E::MODEL, consistency),
402            _marker: PhantomData,
403        }
404    }
405
406    /// Return the intent mode (load vs delete).
407    #[must_use]
408    pub const fn mode(&self) -> QueryMode {
409        self.intent.mode()
410    }
411
412    #[must_use]
413    pub(crate) fn has_explicit_order(&self) -> bool {
414        self.intent.has_explicit_order()
415    }
416
417    #[must_use]
418    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
419        self.intent.load_spec()
420    }
421
422    /// Add a predicate, implicitly AND-ing with any existing predicate.
423    #[must_use]
424    pub fn filter(mut self, predicate: Predicate) -> Self {
425        self.intent = self.intent.filter(predicate);
426        self
427    }
428
429    /// Apply a dynamic filter expression.
430    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
431        let Self { intent, _marker } = self;
432        let intent = intent.filter_expr(expr)?;
433
434        Ok(Self { intent, _marker })
435    }
436
437    /// Apply a dynamic sort expression.
438    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
439        let Self { intent, _marker } = self;
440        let intent = intent.sort_expr(expr)?;
441
442        Ok(Self { intent, _marker })
443    }
444
445    /// Append an ascending sort key.
446    #[must_use]
447    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
448        self.intent = self.intent.order_by(field);
449        self
450    }
451
452    /// Append a descending sort key.
453    #[must_use]
454    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
455        self.intent = self.intent.order_by_desc(field);
456        self
457    }
458
459    /// Set the access path to a single primary key lookup.
460    pub(crate) fn by_id(self, id: E::Key) -> Self {
461        let Self { intent, _marker } = self;
462        Self {
463            intent: intent.by_id(id),
464            _marker,
465        }
466    }
467
468    /// Set the access path to a primary key batch lookup.
469    pub(crate) fn by_ids<I>(self, ids: I) -> Self
470    where
471        I: IntoIterator<Item = E::Key>,
472    {
473        let Self { intent, _marker } = self;
474        Self {
475            intent: intent.by_ids(ids),
476            _marker,
477        }
478    }
479
480    /// Mark this intent as a delete query.
481    #[must_use]
482    pub fn delete(mut self) -> Self {
483        self.intent = self.intent.delete();
484        self
485    }
486
487    /// Apply a limit to the current mode.
488    ///
489    /// Load limits bound result size; delete limits bound mutation size.
490    /// For load queries, any use of `limit` or `offset` requires an explicit
491    /// `order_by(...)` so pagination is deterministic.
492    #[must_use]
493    pub fn limit(mut self, limit: u32) -> Self {
494        self.intent = self.intent.limit(limit);
495        self
496    }
497
498    /// Apply an offset to a load intent.
499    ///
500    /// Any use of `offset` or `limit` requires an explicit `order_by(...)`.
501    #[must_use]
502    pub fn offset(mut self, offset: u32) -> Self {
503        self.intent = self.intent.offset(offset);
504        self
505    }
506
507    /// Explain this intent without executing it.
508    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
509        let plan = self.build_plan()?;
510
511        Ok(plan.explain())
512    }
513
514    /// Plan this intent into an executor-ready plan.
515    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
516        let plan = self.build_plan()?;
517
518        Ok(ExecutablePlan::new(plan))
519    }
520
521    // Build a logical plan for the current intent.
522    fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
523        let plan_value = self.intent.build_plan_model()?;
524        let LogicalPlan {
525            mode,
526            access,
527            predicate,
528            order,
529            delete_limit,
530            page,
531            consistency,
532        } = plan_value;
533
534        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
535        let plan = LogicalPlan {
536            mode,
537            access,
538            predicate,
539            order,
540            delete_limit,
541            page,
542            consistency,
543        };
544
545        Ok(plan)
546    }
547}
548
549impl<E> Query<E>
550where
551    E: EntityKind + SingletonEntity,
552    E::Key: Default,
553{
554    /// Set the access path to the singleton primary key.
555    pub(crate) fn only(self) -> Self {
556        let Self { intent, _marker } = self;
557
558        Self {
559            intent: intent.only(E::Key::default()),
560            _marker,
561        }
562    }
563}
564
565///
566/// QueryError
567///
568
569#[derive(Debug, ThisError)]
570pub enum QueryError {
571    #[error("{0}")]
572    Validate(#[from] ValidateError),
573
574    #[error("{0}")]
575    Plan(Box<PlanError>),
576
577    #[error("{0}")]
578    Intent(#[from] IntentError),
579
580    #[error("{0}")]
581    Response(#[from] ResponseError),
582
583    #[error("{0}")]
584    Execute(#[from] InternalError),
585}
586
587impl From<PlannerError> for QueryError {
588    fn from(err: PlannerError) -> Self {
589        match err {
590            PlannerError::Plan(err) => Self::from(*err),
591            PlannerError::Internal(err) => Self::Execute(*err),
592        }
593    }
594}
595
596impl From<PlanError> for QueryError {
597    fn from(err: PlanError) -> Self {
598        Self::Plan(Box::new(err))
599    }
600}
601
602///
603/// IntentError
604///
605
606#[derive(Clone, Copy, Debug, ThisError)]
607pub enum IntentError {
608    #[error("delete limit requires an explicit ordering")]
609    DeleteLimitRequiresOrder,
610
611    #[error("order specification must include at least one field")]
612    EmptyOrderSpec,
613
614    #[error("by_ids() cannot be combined with predicates")]
615    ByIdsWithPredicate,
616
617    #[error("only() cannot be combined with predicates")]
618    OnlyWithPredicate,
619
620    #[error("multiple key access methods were used on the same query")]
621    KeyAccessConflict,
622
623    #[error("cursor pagination requires an explicit ordering")]
624    CursorRequiresOrder,
625
626    #[error("cursor pagination requires an explicit limit")]
627    CursorRequiresLimit,
628
629    #[error("cursor pagination does not support offset; use the cursor token for continuation")]
630    CursorWithOffsetUnsupported,
631}
632
633impl From<policy::CursorPagingPolicyError> for IntentError {
634    fn from(err: policy::CursorPagingPolicyError) -> Self {
635        match err {
636            policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
637            policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
638            policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
639                Self::CursorWithOffsetUnsupported
640            }
641        }
642    }
643}
644
645/// Helper to append an ordering field while preserving existing order spec.
646fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
647    match order {
648        Some(mut spec) => {
649            spec.fields.push((field.to_string(), direction));
650            spec
651        }
652        None => OrderSpec {
653            fields: vec![(field.to_string(), direction)],
654        },
655    }
656}
657
658// Normalize ORDER BY into a canonical, deterministic shape:
659// - preserve user field order
660// - remove explicit primary-key references from the user segment
661// - append exactly one primary-key field as the terminal tie-break
662fn canonicalize_order_spec(
663    model: &crate::model::entity::EntityModel,
664    order: Option<OrderSpec>,
665) -> Option<OrderSpec> {
666    let mut order = order?;
667    if order.fields.is_empty() {
668        return Some(order);
669    }
670
671    let pk_field = model.primary_key.name;
672    let mut pk_direction = None;
673    order.fields.retain(|(field, direction)| {
674        if field == pk_field {
675            if pk_direction.is_none() {
676                pk_direction = Some(*direction);
677            }
678            false
679        } else {
680            true
681        }
682    });
683
684    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
685    order.fields.push((pk_field.to_string(), pk_direction));
686
687    Some(order)
688}