1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use crate::db::intent::{DeleteSpec, LoadSpec, QueryMode};
8pub(crate) use key_access::*;
9
10use crate::{
11 db::{
12 consistency::ReadConsistency,
13 query::{
14 explain::ExplainPlan,
15 expr::{FilterExpr, SortExpr, SortLowerError},
16 plan::{
17 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
18 PageSpec, PlanError,
19 planner::{PlannerError, plan_access},
20 validate::validate_logical_plan_model,
21 },
22 policy,
23 predicate::{
24 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
25 validate::reject_unsupported_query_features,
26 },
27 },
28 response::ResponseError,
29 },
30 error::InternalError,
31 model::entity::EntityModel,
32 traits::{EntityKind, FieldValue, SingletonEntity},
33 value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38#[derive(Debug)]
46pub(crate) struct QueryModel<'m, K> {
47 model: &'m EntityModel,
48 mode: QueryMode,
49 predicate: Option<Predicate>,
50 key_access: Option<KeyAccessState<K>>,
51 key_access_conflict: bool,
52 order: Option<OrderSpec>,
53 distinct: bool,
54 consistency: ReadConsistency,
55}
56
57impl<'m, K: FieldValue> QueryModel<'m, K> {
58 #[must_use]
59 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
60 Self {
61 model,
62 mode: QueryMode::Load(LoadSpec::new()),
63 predicate: None,
64 key_access: None,
65 key_access_conflict: false,
66 order: None,
67 distinct: false,
68 consistency,
69 }
70 }
71
72 #[must_use]
74 pub(crate) const fn mode(&self) -> QueryMode {
75 self.mode
76 }
77
78 #[must_use]
79 fn has_explicit_order(&self) -> bool {
80 policy::has_explicit_order(self.order.as_ref())
81 }
82
83 #[must_use]
84 const fn load_spec(&self) -> Option<LoadSpec> {
85 match self.mode {
86 QueryMode::Load(spec) => Some(spec),
87 QueryMode::Delete(_) => None,
88 }
89 }
90
91 #[must_use]
93 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
94 self.predicate = match self.predicate.take() {
95 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
96 None => Some(predicate),
97 };
98 self
99 }
100
101 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
103 let schema = SchemaInfo::from_entity_model(self.model)?;
104 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
105
106 Ok(self.filter(predicate))
107 }
108
109 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
111 let schema = SchemaInfo::from_entity_model(self.model)?;
112 let order = match expr.lower_with(&schema) {
113 Ok(order) => order,
114 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
115 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
116 };
117
118 policy::validate_order_shape(Some(&order))
119 .map_err(IntentError::from)
120 .map_err(QueryError::from)?;
121
122 Ok(self.order_spec(order))
123 }
124
125 #[must_use]
127 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
128 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
129 self
130 }
131
132 #[must_use]
134 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
135 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
136 self
137 }
138
139 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
141 self.order = Some(order);
142 self
143 }
144
145 #[must_use]
147 pub(crate) const fn distinct(mut self) -> Self {
148 self.distinct = true;
149 self
150 }
151
152 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
154 if let Some(existing) = &self.key_access
155 && existing.kind != kind
156 {
157 self.key_access_conflict = true;
158 }
159
160 self.key_access = Some(KeyAccessState { kind, access });
161
162 self
163 }
164
165 pub(crate) fn by_id(self, id: K) -> Self {
167 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
168 }
169
170 pub(crate) fn by_ids<I>(self, ids: I) -> Self
172 where
173 I: IntoIterator<Item = K>,
174 {
175 self.set_key_access(
176 KeyAccessKind::Many,
177 KeyAccess::Many(ids.into_iter().collect()),
178 )
179 }
180
181 pub(crate) fn only(self, id: K) -> Self {
183 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
184 }
185
186 #[must_use]
188 pub(crate) const fn delete(mut self) -> Self {
189 if self.mode.is_load() {
190 self.mode = QueryMode::Delete(DeleteSpec::new());
191 }
192 self
193 }
194
195 #[must_use]
199 pub(crate) const fn limit(mut self, limit: u32) -> Self {
200 match self.mode {
201 QueryMode::Load(mut spec) => {
202 spec.limit = Some(limit);
203 self.mode = QueryMode::Load(spec);
204 }
205 QueryMode::Delete(mut spec) => {
206 spec.limit = Some(limit);
207 self.mode = QueryMode::Delete(spec);
208 }
209 }
210 self
211 }
212
213 #[must_use]
215 pub(crate) const fn offset(mut self, offset: u32) -> Self {
216 if let QueryMode::Load(mut spec) = self.mode {
217 spec.offset = offset;
218 self.mode = QueryMode::Load(spec);
219 }
220 self
221 }
222
223 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
225 let schema_info = SchemaInfo::from_entity_model(self.model)?;
227 self.validate_intent()?;
228
229 let normalized_predicate = self
231 .predicate
232 .as_ref()
233 .map(|predicate| {
234 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
235 let predicate = normalize_enum_literals(&schema_info, predicate)?;
236 Ok::<Predicate, ValidateError>(normalize(&predicate))
237 })
238 .transpose()?;
239 let access_plan_value = match &self.key_access {
240 Some(state) => access_plan_from_keys_value(&state.access),
241 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
242 };
243
244 let logical = LogicalPlan {
246 mode: self.mode,
247 predicate: normalized_predicate,
248 order: canonicalize_order_spec(self.model, self.order.clone()),
251 distinct: self.distinct,
252 delete_limit: match self.mode {
253 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
254 QueryMode::Load(_) => None,
255 },
256 page: match self.mode {
257 QueryMode::Load(spec) => {
258 if spec.limit.is_some() || spec.offset > 0 {
259 Some(PageSpec {
260 limit: spec.limit,
261 offset: spec.offset,
262 })
263 } else {
264 None
265 }
266 }
267 QueryMode::Delete(_) => None,
268 },
269 consistency: self.consistency,
270 };
271 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
272
273 validate_logical_plan_model(&schema_info, self.model, &plan)?;
274
275 Ok(plan)
276 }
277
278 fn validate_intent(&self) -> Result<(), IntentError> {
280 if self.key_access_conflict {
281 return Err(IntentError::KeyAccessConflict);
282 }
283
284 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
285 .map_err(IntentError::from)?;
286
287 if let Some(state) = &self.key_access {
288 match state.kind {
289 KeyAccessKind::Many if self.predicate.is_some() => {
290 return Err(IntentError::ByIdsWithPredicate);
291 }
292 KeyAccessKind::Only if self.predicate.is_some() => {
293 return Err(IntentError::OnlyWithPredicate);
294 }
295 _ => {
296 }
298 }
299 }
300
301 Ok(())
302 }
303}
304
305#[derive(Debug)]
323pub struct PlannedQuery<E: EntityKind> {
324 plan: AccessPlannedQuery<E::Key>,
325 _marker: PhantomData<E>,
326}
327
328impl<E: EntityKind> PlannedQuery<E> {
329 #[must_use]
330 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
331 Self {
332 plan,
333 _marker: PhantomData,
334 }
335 }
336
337 #[must_use]
338 pub fn explain(&self) -> ExplainPlan {
339 self.plan.explain_with_model(E::MODEL)
340 }
341
342 #[must_use]
343 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
344 self.plan
345 }
346}
347
348#[derive(Debug)]
349pub struct Query<E: EntityKind> {
350 intent: QueryModel<'static, E::Key>,
351 _marker: PhantomData<E>,
352}
353
354impl<E: EntityKind> Query<E> {
355 #[must_use]
359 pub const fn new(consistency: ReadConsistency) -> Self {
360 Self {
361 intent: QueryModel::new(E::MODEL, consistency),
362 _marker: PhantomData,
363 }
364 }
365
366 #[must_use]
368 pub const fn mode(&self) -> QueryMode {
369 self.intent.mode()
370 }
371
372 #[must_use]
373 pub(crate) fn has_explicit_order(&self) -> bool {
374 self.intent.has_explicit_order()
375 }
376
377 #[must_use]
378 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
379 self.intent.load_spec()
380 }
381
382 #[must_use]
384 pub fn filter(mut self, predicate: Predicate) -> Self {
385 self.intent = self.intent.filter(predicate);
386 self
387 }
388
389 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
391 let Self { intent, _marker } = self;
392 let intent = intent.filter_expr(expr)?;
393
394 Ok(Self { intent, _marker })
395 }
396
397 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
399 let Self { intent, _marker } = self;
400 let intent = intent.sort_expr(expr)?;
401
402 Ok(Self { intent, _marker })
403 }
404
405 #[must_use]
407 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
408 self.intent = self.intent.order_by(field);
409 self
410 }
411
412 #[must_use]
414 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
415 self.intent = self.intent.order_by_desc(field);
416 self
417 }
418
419 #[must_use]
421 pub fn distinct(mut self) -> Self {
422 self.intent = self.intent.distinct();
423 self
424 }
425
426 pub(crate) fn by_id(self, id: E::Key) -> Self {
428 let Self { intent, _marker } = self;
429 Self {
430 intent: intent.by_id(id),
431 _marker,
432 }
433 }
434
435 pub(crate) fn by_ids<I>(self, ids: I) -> Self
437 where
438 I: IntoIterator<Item = E::Key>,
439 {
440 let Self { intent, _marker } = self;
441 Self {
442 intent: intent.by_ids(ids),
443 _marker,
444 }
445 }
446
447 #[must_use]
449 pub fn delete(mut self) -> Self {
450 self.intent = self.intent.delete();
451 self
452 }
453
454 #[must_use]
460 pub fn limit(mut self, limit: u32) -> Self {
461 self.intent = self.intent.limit(limit);
462 self
463 }
464
465 #[must_use]
469 pub fn offset(mut self, offset: u32) -> Self {
470 self.intent = self.intent.offset(offset);
471 self
472 }
473
474 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
476 let plan = self.planned()?;
477
478 Ok(plan.explain())
479 }
480
481 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
483 let plan = self.build_plan()?;
484
485 Ok(PlannedQuery::new(plan))
486 }
487
488 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
490 let plan_value = self.intent.build_plan_model()?;
491 let (logical, access) = plan_value.into_parts();
492 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
493 let plan = AccessPlannedQuery::from_parts(logical, access);
494
495 Ok(plan)
496 }
497}
498
499impl<E> Query<E>
500where
501 E: EntityKind + SingletonEntity,
502 E::Key: Default,
503{
504 pub(crate) fn only(self) -> Self {
506 let Self { intent, _marker } = self;
507
508 Self {
509 intent: intent.only(E::Key::default()),
510 _marker,
511 }
512 }
513}
514
515#[derive(Debug, ThisError)]
520pub enum QueryError {
521 #[error("{0}")]
522 Validate(#[from] ValidateError),
523
524 #[error("{0}")]
525 Plan(Box<PlanError>),
526
527 #[error("{0}")]
528 Intent(#[from] IntentError),
529
530 #[error("{0}")]
531 Response(#[from] ResponseError),
532
533 #[error("{0}")]
534 Execute(#[from] InternalError),
535}
536
537impl From<PlannerError> for QueryError {
538 fn from(err: PlannerError) -> Self {
539 match err {
540 PlannerError::Plan(err) => Self::from(*err),
541 PlannerError::Internal(err) => Self::Execute(*err),
542 }
543 }
544}
545
546impl From<PlanError> for QueryError {
547 fn from(err: PlanError) -> Self {
548 Self::Plan(Box::new(err))
549 }
550}
551
552#[derive(Clone, Copy, Debug, ThisError)]
557pub enum IntentError {
558 #[error("{0}")]
559 PlanShape(#[from] policy::PlanPolicyError),
560
561 #[error("by_ids() cannot be combined with predicates")]
562 ByIdsWithPredicate,
563
564 #[error("only() cannot be combined with predicates")]
565 OnlyWithPredicate,
566
567 #[error("multiple key access methods were used on the same query")]
568 KeyAccessConflict,
569
570 #[error("cursor pagination requires an explicit ordering")]
571 CursorRequiresOrder,
572
573 #[error("cursor pagination requires an explicit limit")]
574 CursorRequiresLimit,
575
576 #[error("cursor tokens can only be used with .page().execute()")]
577 CursorRequiresPagedExecution,
578}
579
580impl From<policy::CursorPagingPolicyError> for IntentError {
581 fn from(err: policy::CursorPagingPolicyError) -> Self {
582 match err {
583 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
584 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
585 }
586 }
587}
588
589fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
591 match order {
592 Some(mut spec) => {
593 spec.fields.push((field.to_string(), direction));
594 spec
595 }
596 None => OrderSpec {
597 fields: vec![(field.to_string(), direction)],
598 },
599 }
600}
601
602fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
607 let mut order = order?;
608 if order.fields.is_empty() {
609 return Some(order);
610 }
611
612 let pk_field = model.primary_key.name;
613 let mut pk_direction = None;
614 order.fields.retain(|(field, direction)| {
615 if field == pk_field {
616 if pk_direction.is_none() {
617 pk_direction = Some(*direction);
618 }
619 false
620 } else {
621 true
622 }
623 });
624
625 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
626 order.fields.push((pk_field.to_string(), pk_direction));
627
628 Some(order)
629}