1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8pub type DeleteSpec = crate::db::access::DeleteSpec;
9pub type LoadSpec = crate::db::access::LoadSpec;
10pub type QueryMode = crate::db::access::QueryMode;
11
12use crate::{
13 db::{
14 contracts::ReadConsistency,
15 executor::ExecutablePlan,
16 policy,
17 query::{
18 explain::ExplainPlan,
19 expr::{FilterExpr, SortExpr, SortLowerError},
20 plan::{
21 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
22 PageSpec, PlanError,
23 planner::{PlannerError, plan_access},
24 validate::validate_logical_plan_model,
25 },
26 predicate::{
27 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
28 validate::reject_unsupported_query_features,
29 },
30 },
31 response::ResponseError,
32 },
33 error::InternalError,
34 model::entity::EntityModel,
35 traits::{EntityKind, FieldValue, SingletonEntity},
36 value::Value,
37};
38use std::marker::PhantomData;
39use thiserror::Error as ThisError;
40
41#[derive(Debug)]
49pub(crate) struct QueryModel<'m, K> {
50 model: &'m EntityModel,
51 mode: QueryMode,
52 predicate: Option<Predicate>,
53 key_access: Option<KeyAccessState<K>>,
54 key_access_conflict: bool,
55 order: Option<OrderSpec>,
56 distinct: bool,
57 consistency: ReadConsistency,
58}
59
60impl<'m, K: FieldValue> QueryModel<'m, K> {
61 #[must_use]
62 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
63 Self {
64 model,
65 mode: QueryMode::Load(LoadSpec::new()),
66 predicate: None,
67 key_access: None,
68 key_access_conflict: false,
69 order: None,
70 distinct: false,
71 consistency,
72 }
73 }
74
75 #[must_use]
77 pub(crate) const fn mode(&self) -> QueryMode {
78 self.mode
79 }
80
81 #[must_use]
82 fn has_explicit_order(&self) -> bool {
83 policy::has_explicit_order(self.order.as_ref())
84 }
85
86 #[must_use]
87 const fn load_spec(&self) -> Option<LoadSpec> {
88 match self.mode {
89 QueryMode::Load(spec) => Some(spec),
90 QueryMode::Delete(_) => None,
91 }
92 }
93
94 #[must_use]
96 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
97 self.predicate = match self.predicate.take() {
98 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
99 None => Some(predicate),
100 };
101 self
102 }
103
104 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
106 let schema = SchemaInfo::from_entity_model(self.model)?;
107 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
108
109 Ok(self.filter(predicate))
110 }
111
112 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
114 let schema = SchemaInfo::from_entity_model(self.model)?;
115 let order = match expr.lower_with(&schema) {
116 Ok(order) => order,
117 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
118 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
119 };
120
121 policy::validate_order_shape(Some(&order))
122 .map_err(IntentError::from)
123 .map_err(QueryError::from)?;
124
125 Ok(self.order_spec(order))
126 }
127
128 #[must_use]
130 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
131 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
132 self
133 }
134
135 #[must_use]
137 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
138 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
139 self
140 }
141
142 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
144 self.order = Some(order);
145 self
146 }
147
148 #[must_use]
150 pub(crate) const fn distinct(mut self) -> Self {
151 self.distinct = true;
152 self
153 }
154
155 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
157 if let Some(existing) = &self.key_access
158 && existing.kind != kind
159 {
160 self.key_access_conflict = true;
161 }
162
163 self.key_access = Some(KeyAccessState { kind, access });
164
165 self
166 }
167
168 pub(crate) fn by_id(self, id: K) -> Self {
170 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
171 }
172
173 pub(crate) fn by_ids<I>(self, ids: I) -> Self
175 where
176 I: IntoIterator<Item = K>,
177 {
178 self.set_key_access(
179 KeyAccessKind::Many,
180 KeyAccess::Many(ids.into_iter().collect()),
181 )
182 }
183
184 pub(crate) fn only(self, id: K) -> Self {
186 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
187 }
188
189 #[must_use]
191 pub(crate) const fn delete(mut self) -> Self {
192 if self.mode.is_load() {
193 self.mode = QueryMode::Delete(DeleteSpec::new());
194 }
195 self
196 }
197
198 #[must_use]
202 pub(crate) const fn limit(mut self, limit: u32) -> Self {
203 match self.mode {
204 QueryMode::Load(mut spec) => {
205 spec.limit = Some(limit);
206 self.mode = QueryMode::Load(spec);
207 }
208 QueryMode::Delete(mut spec) => {
209 spec.limit = Some(limit);
210 self.mode = QueryMode::Delete(spec);
211 }
212 }
213 self
214 }
215
216 #[must_use]
218 pub(crate) const fn offset(mut self, offset: u32) -> Self {
219 if let QueryMode::Load(mut spec) = self.mode {
220 spec.offset = offset;
221 self.mode = QueryMode::Load(spec);
222 }
223 self
224 }
225
226 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
228 let schema_info = SchemaInfo::from_entity_model(self.model)?;
230 self.validate_intent()?;
231
232 let normalized_predicate = self
234 .predicate
235 .as_ref()
236 .map(|predicate| {
237 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
238 let predicate = normalize_enum_literals(&schema_info, predicate)?;
239 Ok::<Predicate, ValidateError>(normalize(&predicate))
240 })
241 .transpose()?;
242 let access_plan_value = match &self.key_access {
243 Some(state) => access_plan_from_keys_value(&state.access),
244 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
245 };
246
247 let logical = LogicalPlan {
249 mode: self.mode,
250 predicate: normalized_predicate,
251 order: canonicalize_order_spec(self.model, self.order.clone()),
254 distinct: self.distinct,
255 delete_limit: match self.mode {
256 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
257 QueryMode::Load(_) => None,
258 },
259 page: match self.mode {
260 QueryMode::Load(spec) => {
261 if spec.limit.is_some() || spec.offset > 0 {
262 Some(PageSpec {
263 limit: spec.limit,
264 offset: spec.offset,
265 })
266 } else {
267 None
268 }
269 }
270 QueryMode::Delete(_) => None,
271 },
272 consistency: self.consistency,
273 };
274 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
275
276 validate_logical_plan_model(&schema_info, self.model, &plan)?;
277
278 Ok(plan)
279 }
280
281 fn validate_intent(&self) -> Result<(), IntentError> {
283 if self.key_access_conflict {
284 return Err(IntentError::KeyAccessConflict);
285 }
286
287 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
288 .map_err(IntentError::from)?;
289
290 if let Some(state) = &self.key_access {
291 match state.kind {
292 KeyAccessKind::Many if self.predicate.is_some() => {
293 return Err(IntentError::ByIdsWithPredicate);
294 }
295 KeyAccessKind::Only if self.predicate.is_some() => {
296 return Err(IntentError::OnlyWithPredicate);
297 }
298 _ => {
299 }
301 }
302 }
303
304 Ok(())
305 }
306}
307
308#[derive(Debug)]
326pub struct PlannedQuery<E: EntityKind> {
327 plan: AccessPlannedQuery<E::Key>,
328 _marker: PhantomData<E>,
329}
330
331impl<E: EntityKind> PlannedQuery<E> {
332 #[must_use]
333 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
334 Self {
335 plan,
336 _marker: PhantomData,
337 }
338 }
339
340 #[must_use]
341 pub fn explain(&self) -> ExplainPlan {
342 self.plan.explain_with_model(E::MODEL)
343 }
344
345 #[must_use]
346 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
347 self.plan
348 }
349}
350
351impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
352 fn from(value: PlannedQuery<E>) -> Self {
353 Self::new(value.into_inner())
354 }
355}
356
357#[derive(Debug)]
358pub struct Query<E: EntityKind> {
359 intent: QueryModel<'static, E::Key>,
360 _marker: PhantomData<E>,
361}
362
363impl<E: EntityKind> Query<E> {
364 #[must_use]
368 pub const fn new(consistency: ReadConsistency) -> Self {
369 Self {
370 intent: QueryModel::new(E::MODEL, consistency),
371 _marker: PhantomData,
372 }
373 }
374
375 #[must_use]
377 pub const fn mode(&self) -> QueryMode {
378 self.intent.mode()
379 }
380
381 #[must_use]
382 pub(crate) fn has_explicit_order(&self) -> bool {
383 self.intent.has_explicit_order()
384 }
385
386 #[must_use]
387 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
388 self.intent.load_spec()
389 }
390
391 #[must_use]
393 pub fn filter(mut self, predicate: Predicate) -> Self {
394 self.intent = self.intent.filter(predicate);
395 self
396 }
397
398 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
400 let Self { intent, _marker } = self;
401 let intent = intent.filter_expr(expr)?;
402
403 Ok(Self { intent, _marker })
404 }
405
406 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
408 let Self { intent, _marker } = self;
409 let intent = intent.sort_expr(expr)?;
410
411 Ok(Self { intent, _marker })
412 }
413
414 #[must_use]
416 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
417 self.intent = self.intent.order_by(field);
418 self
419 }
420
421 #[must_use]
423 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
424 self.intent = self.intent.order_by_desc(field);
425 self
426 }
427
428 #[must_use]
430 pub fn distinct(mut self) -> Self {
431 self.intent = self.intent.distinct();
432 self
433 }
434
435 pub(crate) fn by_id(self, id: E::Key) -> Self {
437 let Self { intent, _marker } = self;
438 Self {
439 intent: intent.by_id(id),
440 _marker,
441 }
442 }
443
444 pub(crate) fn by_ids<I>(self, ids: I) -> Self
446 where
447 I: IntoIterator<Item = E::Key>,
448 {
449 let Self { intent, _marker } = self;
450 Self {
451 intent: intent.by_ids(ids),
452 _marker,
453 }
454 }
455
456 #[must_use]
458 pub fn delete(mut self) -> Self {
459 self.intent = self.intent.delete();
460 self
461 }
462
463 #[must_use]
469 pub fn limit(mut self, limit: u32) -> Self {
470 self.intent = self.intent.limit(limit);
471 self
472 }
473
474 #[must_use]
478 pub fn offset(mut self, offset: u32) -> Self {
479 self.intent = self.intent.offset(offset);
480 self
481 }
482
483 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
485 let plan = self.planned()?;
486
487 Ok(plan.explain())
488 }
489
490 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
492 let plan = self.build_plan()?;
493
494 Ok(PlannedQuery::new(plan))
495 }
496
497 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
499 self.planned().map(ExecutablePlan::from)
500 }
501
502 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
504 let plan_value = self.intent.build_plan_model()?;
505 let (logical, access) = plan_value.into_parts();
506 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
507 let plan = AccessPlannedQuery::from_parts(logical, access);
508
509 Ok(plan)
510 }
511}
512
513impl<E> Query<E>
514where
515 E: EntityKind + SingletonEntity,
516 E::Key: Default,
517{
518 pub(crate) fn only(self) -> Self {
520 let Self { intent, _marker } = self;
521
522 Self {
523 intent: intent.only(E::Key::default()),
524 _marker,
525 }
526 }
527}
528
529#[derive(Debug, ThisError)]
534pub enum QueryError {
535 #[error("{0}")]
536 Validate(#[from] ValidateError),
537
538 #[error("{0}")]
539 Plan(Box<PlanError>),
540
541 #[error("{0}")]
542 Intent(#[from] IntentError),
543
544 #[error("{0}")]
545 Response(#[from] ResponseError),
546
547 #[error("{0}")]
548 Execute(#[from] InternalError),
549}
550
551impl From<PlannerError> for QueryError {
552 fn from(err: PlannerError) -> Self {
553 match err {
554 PlannerError::Plan(err) => Self::from(*err),
555 PlannerError::Internal(err) => Self::Execute(*err),
556 }
557 }
558}
559
560impl From<PlanError> for QueryError {
561 fn from(err: PlanError) -> Self {
562 Self::Plan(Box::new(err))
563 }
564}
565
566#[derive(Clone, Copy, Debug, ThisError)]
571pub enum IntentError {
572 #[error("{0}")]
573 PlanShape(#[from] policy::PlanPolicyError),
574
575 #[error("by_ids() cannot be combined with predicates")]
576 ByIdsWithPredicate,
577
578 #[error("only() cannot be combined with predicates")]
579 OnlyWithPredicate,
580
581 #[error("multiple key access methods were used on the same query")]
582 KeyAccessConflict,
583
584 #[error("cursor pagination requires an explicit ordering")]
585 CursorRequiresOrder,
586
587 #[error("cursor pagination requires an explicit limit")]
588 CursorRequiresLimit,
589
590 #[error("cursor tokens can only be used with .page().execute()")]
591 CursorRequiresPagedExecution,
592}
593
594impl From<policy::CursorPagingPolicyError> for IntentError {
595 fn from(err: policy::CursorPagingPolicyError) -> Self {
596 match err {
597 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
598 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
599 }
600 }
601}
602
603fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
605 match order {
606 Some(mut spec) => {
607 spec.fields.push((field.to_string(), direction));
608 spec
609 }
610 None => OrderSpec {
611 fields: vec![(field.to_string(), direction)],
612 },
613 }
614}
615
616fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
621 let mut order = order?;
622 if order.fields.is_empty() {
623 return Some(order);
624 }
625
626 let pk_field = model.primary_key.name;
627 let mut pk_direction = None;
628 order.fields.retain(|(field, direction)| {
629 if field == pk_field {
630 if pk_direction.is_none() {
631 pk_direction = Some(*direction);
632 }
633 false
634 } else {
635 true
636 }
637 });
638
639 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
640 order.fields.push((pk_field.to_string(), pk_direction));
641
642 Some(order)
643}