1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 policy,
21 predicate::{
22 Predicate, SchemaInfo, ValidateError, normalize,
23 validate::reject_unsupported_query_features,
24 },
25 },
26 response::ResponseError,
27 },
28 error::InternalError,
29 traits::{EntityKind, FieldValue, SingletonEntity},
30 value::Value,
31};
32use std::marker::PhantomData;
33use thiserror::Error as ThisError;
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
43pub enum QueryMode {
44 Load(LoadSpec),
45 Delete(DeleteSpec),
46}
47
48impl QueryMode {
49 #[must_use]
51 pub const fn is_load(&self) -> bool {
52 match self {
53 Self::Load(_) => true,
54 Self::Delete(_) => false,
55 }
56 }
57
58 #[must_use]
60 pub const fn is_delete(&self) -> bool {
61 match self {
62 Self::Delete(_) => true,
63 Self::Load(_) => false,
64 }
65 }
66}
67
68#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
75pub struct LoadSpec {
76 pub limit: Option<u32>,
77 pub offset: u32,
78}
79
80impl LoadSpec {
81 #[must_use]
83 pub const fn new() -> Self {
84 Self {
85 limit: None,
86 offset: 0,
87 }
88 }
89}
90
91#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub struct DeleteSpec {
99 pub limit: Option<u32>,
100}
101
102impl DeleteSpec {
103 #[must_use]
105 pub const fn new() -> Self {
106 Self { limit: None }
107 }
108}
109
110#[derive(Debug)]
118pub(crate) struct QueryModel<'m, K> {
119 model: &'m crate::model::entity::EntityModel,
120 mode: QueryMode,
121 predicate: Option<Predicate>,
122 key_access: Option<KeyAccessState<K>>,
123 key_access_conflict: bool,
124 order: Option<OrderSpec>,
125 consistency: ReadConsistency,
126}
127
128impl<'m, K: FieldValue> QueryModel<'m, K> {
129 #[must_use]
130 pub const fn new(
131 model: &'m crate::model::entity::EntityModel,
132 consistency: ReadConsistency,
133 ) -> Self {
134 Self {
135 model,
136 mode: QueryMode::Load(LoadSpec::new()),
137 predicate: None,
138 key_access: None,
139 key_access_conflict: false,
140 order: None,
141 consistency,
142 }
143 }
144
145 #[must_use]
147 pub const fn mode(&self) -> QueryMode {
148 self.mode
149 }
150
151 #[must_use]
152 fn has_explicit_order(&self) -> bool {
153 policy::has_explicit_order(self.order.as_ref())
154 }
155
156 #[must_use]
157 const fn load_spec(&self) -> Option<LoadSpec> {
158 match self.mode {
159 QueryMode::Load(spec) => Some(spec),
160 QueryMode::Delete(_) => None,
161 }
162 }
163
164 #[must_use]
166 pub fn filter(mut self, predicate: Predicate) -> Self {
167 self.predicate = match self.predicate.take() {
168 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169 None => Some(predicate),
170 };
171 self
172 }
173
174 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176 let schema = SchemaInfo::from_entity_model(self.model)?;
177 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179 Ok(self.filter(predicate))
180 }
181
182 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184 let schema = SchemaInfo::from_entity_model(self.model)?;
185 let order = match expr.lower_with(&schema) {
186 Ok(order) => order,
187 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189 };
190
191 if order.fields.is_empty() {
192 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
193 }
194
195 Ok(self.order_spec(order))
196 }
197
198 #[must_use]
200 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
201 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202 self
203 }
204
205 #[must_use]
207 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209 self
210 }
211
212 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214 self.order = Some(order);
215 self
216 }
217
218 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
220 if let Some(existing) = &self.key_access
221 && existing.kind != kind
222 {
223 self.key_access_conflict = true;
224 }
225
226 self.key_access = Some(KeyAccessState { kind, access });
227
228 self
229 }
230
231 pub(crate) fn by_id(self, id: K) -> Self {
233 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
234 }
235
236 pub(crate) fn by_ids<I>(self, ids: I) -> Self
238 where
239 I: IntoIterator<Item = K>,
240 {
241 self.set_key_access(
242 KeyAccessKind::Many,
243 KeyAccess::Many(ids.into_iter().collect()),
244 )
245 }
246
247 pub(crate) fn only(self, id: K) -> Self {
249 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
250 }
251
252 #[must_use]
254 pub const fn delete(mut self) -> Self {
255 if self.mode.is_load() {
256 self.mode = QueryMode::Delete(DeleteSpec::new());
257 }
258 self
259 }
260
261 #[must_use]
265 pub const fn limit(mut self, limit: u32) -> Self {
266 match self.mode {
267 QueryMode::Load(mut spec) => {
268 spec.limit = Some(limit);
269 self.mode = QueryMode::Load(spec);
270 }
271 QueryMode::Delete(mut spec) => {
272 spec.limit = Some(limit);
273 self.mode = QueryMode::Delete(spec);
274 }
275 }
276 self
277 }
278
279 #[must_use]
281 pub const fn offset(mut self, offset: u32) -> Self {
282 if let QueryMode::Load(mut spec) = self.mode {
283 spec.offset = offset;
284 self.mode = QueryMode::Load(spec);
285 }
286 self
287 }
288
289 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
291 let schema_info = SchemaInfo::from_entity_model(self.model)?;
293 self.validate_intent()?;
294
295 if let Some(predicate) = self.predicate.as_ref() {
296 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
297 }
298
299 let normalized_predicate = self.predicate.as_ref().map(normalize);
301 let access_plan_value = match &self.key_access {
302 Some(state) => access_plan_from_keys_value(&state.access),
303 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
304 };
305
306 let plan = LogicalPlan {
308 mode: self.mode,
309 access: access_plan_value,
310 predicate: normalized_predicate,
311 order: canonicalize_order_spec(self.model, self.order.clone()),
314 delete_limit: match self.mode {
315 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
316 QueryMode::Load(_) => None,
317 },
318 page: match self.mode {
319 QueryMode::Load(spec) => {
320 if spec.limit.is_some() || spec.offset > 0 {
321 Some(PageSpec {
322 limit: spec.limit,
323 offset: spec.offset,
324 })
325 } else {
326 None
327 }
328 }
329 QueryMode::Delete(_) => None,
330 },
331 consistency: self.consistency,
332 };
333
334 validate_logical_plan_model(&schema_info, self.model, &plan)?;
335
336 Ok(plan)
337 }
338
339 fn validate_intent(&self) -> Result<(), IntentError> {
341 if self.key_access_conflict {
342 return Err(IntentError::KeyAccessConflict);
343 }
344
345 if policy::has_empty_order(self.order.as_ref()) {
346 return Err(IntentError::EmptyOrderSpec);
347 }
348
349 if let Some(state) = &self.key_access {
350 match state.kind {
351 KeyAccessKind::Many if self.predicate.is_some() => {
352 return Err(IntentError::ByIdsWithPredicate);
353 }
354 KeyAccessKind::Only if self.predicate.is_some() => {
355 return Err(IntentError::OnlyWithPredicate);
356 }
357 _ => {
358 }
360 }
361 }
362
363 match self.mode {
364 QueryMode::Load(_) => {}
365 QueryMode::Delete(spec) => {
366 if spec.limit.is_some() && !policy::has_explicit_order(self.order.as_ref()) {
367 return Err(IntentError::DeleteLimitRequiresOrder);
368 }
369 }
370 }
371
372 Ok(())
373 }
374}
375
376#[derive(Debug)]
388pub struct Query<E: EntityKind> {
389 intent: QueryModel<'static, E::Key>,
390 _marker: PhantomData<E>,
391}
392
393impl<E: EntityKind> Query<E> {
394 #[must_use]
398 pub const fn new(consistency: ReadConsistency) -> Self {
399 Self {
400 intent: QueryModel::new(E::MODEL, consistency),
401 _marker: PhantomData,
402 }
403 }
404
405 #[must_use]
407 pub const fn mode(&self) -> QueryMode {
408 self.intent.mode()
409 }
410
411 #[must_use]
412 pub(crate) fn has_explicit_order(&self) -> bool {
413 self.intent.has_explicit_order()
414 }
415
416 #[must_use]
417 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
418 self.intent.load_spec()
419 }
420
421 #[must_use]
423 pub fn filter(mut self, predicate: Predicate) -> Self {
424 self.intent = self.intent.filter(predicate);
425 self
426 }
427
428 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
430 let Self { intent, _marker } = self;
431 let intent = intent.filter_expr(expr)?;
432
433 Ok(Self { intent, _marker })
434 }
435
436 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
438 let Self { intent, _marker } = self;
439 let intent = intent.sort_expr(expr)?;
440
441 Ok(Self { intent, _marker })
442 }
443
444 #[must_use]
446 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
447 self.intent = self.intent.order_by(field);
448 self
449 }
450
451 #[must_use]
453 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
454 self.intent = self.intent.order_by_desc(field);
455 self
456 }
457
458 pub(crate) fn by_id(self, id: E::Key) -> Self {
460 let Self { intent, _marker } = self;
461 Self {
462 intent: intent.by_id(id),
463 _marker,
464 }
465 }
466
467 pub(crate) fn by_ids<I>(self, ids: I) -> Self
469 where
470 I: IntoIterator<Item = E::Key>,
471 {
472 let Self { intent, _marker } = self;
473 Self {
474 intent: intent.by_ids(ids),
475 _marker,
476 }
477 }
478
479 #[must_use]
481 pub fn delete(mut self) -> Self {
482 self.intent = self.intent.delete();
483 self
484 }
485
486 #[must_use]
492 pub fn limit(mut self, limit: u32) -> Self {
493 self.intent = self.intent.limit(limit);
494 self
495 }
496
497 #[must_use]
501 pub fn offset(mut self, offset: u32) -> Self {
502 self.intent = self.intent.offset(offset);
503 self
504 }
505
506 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
508 let plan = self.build_plan()?;
509
510 Ok(plan.explain_with_model(E::MODEL))
511 }
512
513 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
515 let plan = self.build_plan()?;
516
517 Ok(ExecutablePlan::new(plan))
518 }
519
520 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
522 let plan_value = self.intent.build_plan_model()?;
523 let LogicalPlan {
524 mode,
525 access,
526 predicate,
527 order,
528 delete_limit,
529 page,
530 consistency,
531 } = plan_value;
532
533 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
534 let plan = LogicalPlan {
535 mode,
536 access,
537 predicate,
538 order,
539 delete_limit,
540 page,
541 consistency,
542 };
543
544 Ok(plan)
545 }
546}
547
548impl<E> Query<E>
549where
550 E: EntityKind + SingletonEntity,
551 E::Key: Default,
552{
553 pub(crate) fn only(self) -> Self {
555 let Self { intent, _marker } = self;
556
557 Self {
558 intent: intent.only(E::Key::default()),
559 _marker,
560 }
561 }
562}
563
564#[derive(Debug, ThisError)]
569pub enum QueryError {
570 #[error("{0}")]
571 Validate(#[from] ValidateError),
572
573 #[error("{0}")]
574 Plan(Box<PlanError>),
575
576 #[error("{0}")]
577 Intent(#[from] IntentError),
578
579 #[error("{0}")]
580 Response(#[from] ResponseError),
581
582 #[error("{0}")]
583 Execute(#[from] InternalError),
584}
585
586impl From<PlannerError> for QueryError {
587 fn from(err: PlannerError) -> Self {
588 match err {
589 PlannerError::Plan(err) => Self::from(*err),
590 PlannerError::Internal(err) => Self::Execute(*err),
591 }
592 }
593}
594
595impl From<PlanError> for QueryError {
596 fn from(err: PlanError) -> Self {
597 Self::Plan(Box::new(err))
598 }
599}
600
601#[derive(Clone, Copy, Debug, ThisError)]
606pub enum IntentError {
607 #[error("delete limit requires an explicit ordering")]
608 DeleteLimitRequiresOrder,
609
610 #[error("order specification must include at least one field")]
611 EmptyOrderSpec,
612
613 #[error("by_ids() cannot be combined with predicates")]
614 ByIdsWithPredicate,
615
616 #[error("only() cannot be combined with predicates")]
617 OnlyWithPredicate,
618
619 #[error("multiple key access methods were used on the same query")]
620 KeyAccessConflict,
621
622 #[error("cursor pagination requires an explicit ordering")]
623 CursorRequiresOrder,
624
625 #[error("cursor pagination requires an explicit limit")]
626 CursorRequiresLimit,
627
628 #[error("cursor pagination does not support offset; use the cursor token for continuation")]
629 CursorWithOffsetUnsupported,
630}
631
632impl From<policy::CursorPagingPolicyError> for IntentError {
633 fn from(err: policy::CursorPagingPolicyError) -> Self {
634 match err {
635 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
636 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
637 policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
638 Self::CursorWithOffsetUnsupported
639 }
640 }
641 }
642}
643
644fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
646 match order {
647 Some(mut spec) => {
648 spec.fields.push((field.to_string(), direction));
649 spec
650 }
651 None => OrderSpec {
652 fields: vec![(field.to_string(), direction)],
653 },
654 }
655}
656
657fn canonicalize_order_spec(
662 model: &crate::model::entity::EntityModel,
663 order: Option<OrderSpec>,
664) -> Option<OrderSpec> {
665 let mut order = order?;
666 if order.fields.is_empty() {
667 return Some(order);
668 }
669
670 let pk_field = model.primary_key.name;
671 let mut pk_direction = None;
672 order.fields.retain(|(field, direction)| {
673 if field == pk_field {
674 if pk_direction.is_none() {
675 pk_direction = Some(*direction);
676 }
677 false
678 } else {
679 true
680 }
681 });
682
683 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
684 order.fields.push((pk_field.to_string(), pk_direction));
685
686 Some(order)
687}