1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 enum_filter::normalize_enum_literals,
14 expr::{FilterExpr, SortExpr, SortLowerError},
15 plan::{
16 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
17 OrderSpec, PageSpec, PlanError,
18 planner::{PlannerError, plan_access},
19 validate::validate_logical_plan_model,
20 },
21 policy,
22 predicate::{
23 Predicate, SchemaInfo, ValidateError, normalize,
24 validate::reject_unsupported_query_features,
25 },
26 },
27 response::ResponseError,
28 },
29 error::InternalError,
30 model::entity::EntityModel,
31 traits::{EntityKind, FieldValue, SingletonEntity},
32 value::Value,
33};
34use std::marker::PhantomData;
35use thiserror::Error as ThisError;
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum QueryMode {
46 Load(LoadSpec),
47 Delete(DeleteSpec),
48}
49
50impl QueryMode {
51 #[must_use]
53 pub const fn is_load(&self) -> bool {
54 match self {
55 Self::Load(_) => true,
56 Self::Delete(_) => false,
57 }
58 }
59
60 #[must_use]
62 pub const fn is_delete(&self) -> bool {
63 match self {
64 Self::Delete(_) => true,
65 Self::Load(_) => false,
66 }
67 }
68}
69
70#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct LoadSpec {
78 pub limit: Option<u32>,
79 pub offset: u32,
80}
81
82impl LoadSpec {
83 #[must_use]
85 pub const fn new() -> Self {
86 Self {
87 limit: None,
88 offset: 0,
89 }
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
100pub struct DeleteSpec {
101 pub limit: Option<u32>,
102}
103
104impl DeleteSpec {
105 #[must_use]
107 pub const fn new() -> Self {
108 Self { limit: None }
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct QueryModel<'m, K> {
121 model: &'m EntityModel,
122 mode: QueryMode,
123 predicate: Option<Predicate>,
124 key_access: Option<KeyAccessState<K>>,
125 key_access_conflict: bool,
126 order: Option<OrderSpec>,
127 consistency: ReadConsistency,
128}
129
130impl<'m, K: FieldValue> QueryModel<'m, K> {
131 #[must_use]
132 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
133 Self {
134 model,
135 mode: QueryMode::Load(LoadSpec::new()),
136 predicate: None,
137 key_access: None,
138 key_access_conflict: false,
139 order: None,
140 consistency,
141 }
142 }
143
144 #[must_use]
146 pub(crate) const fn mode(&self) -> QueryMode {
147 self.mode
148 }
149
150 #[must_use]
151 fn has_explicit_order(&self) -> bool {
152 policy::has_explicit_order(self.order.as_ref())
153 }
154
155 #[must_use]
156 const fn load_spec(&self) -> Option<LoadSpec> {
157 match self.mode {
158 QueryMode::Load(spec) => Some(spec),
159 QueryMode::Delete(_) => None,
160 }
161 }
162
163 #[must_use]
165 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
166 self.predicate = match self.predicate.take() {
167 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
168 None => Some(predicate),
169 };
170 self
171 }
172
173 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
175 let schema = SchemaInfo::from_entity_model(self.model)?;
176 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
177
178 Ok(self.filter(predicate))
179 }
180
181 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
183 let schema = SchemaInfo::from_entity_model(self.model)?;
184 let order = match expr.lower_with(&schema) {
185 Ok(order) => order,
186 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
187 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
188 };
189
190 policy::validate_order_shape(Some(&order))
191 .map_err(IntentError::from)
192 .map_err(QueryError::from)?;
193
194 Ok(self.order_spec(order))
195 }
196
197 #[must_use]
199 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
200 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
201 self
202 }
203
204 #[must_use]
206 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
207 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
208 self
209 }
210
211 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
213 self.order = Some(order);
214 self
215 }
216
217 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
219 if let Some(existing) = &self.key_access
220 && existing.kind != kind
221 {
222 self.key_access_conflict = true;
223 }
224
225 self.key_access = Some(KeyAccessState { kind, access });
226
227 self
228 }
229
230 pub(crate) fn by_id(self, id: K) -> Self {
232 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
233 }
234
235 pub(crate) fn by_ids<I>(self, ids: I) -> Self
237 where
238 I: IntoIterator<Item = K>,
239 {
240 self.set_key_access(
241 KeyAccessKind::Many,
242 KeyAccess::Many(ids.into_iter().collect()),
243 )
244 }
245
246 pub(crate) fn only(self, id: K) -> Self {
248 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
249 }
250
251 #[must_use]
253 pub(crate) const fn delete(mut self) -> Self {
254 if self.mode.is_load() {
255 self.mode = QueryMode::Delete(DeleteSpec::new());
256 }
257 self
258 }
259
260 #[must_use]
264 pub(crate) const fn limit(mut self, limit: u32) -> Self {
265 match self.mode {
266 QueryMode::Load(mut spec) => {
267 spec.limit = Some(limit);
268 self.mode = QueryMode::Load(spec);
269 }
270 QueryMode::Delete(mut spec) => {
271 spec.limit = Some(limit);
272 self.mode = QueryMode::Delete(spec);
273 }
274 }
275 self
276 }
277
278 #[must_use]
280 pub(crate) const fn offset(mut self, offset: u32) -> Self {
281 if let QueryMode::Load(mut spec) = self.mode {
282 spec.offset = offset;
283 self.mode = QueryMode::Load(spec);
284 }
285 self
286 }
287
288 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
290 let schema_info = SchemaInfo::from_entity_model(self.model)?;
292 self.validate_intent()?;
293
294 let normalized_predicate = self
296 .predicate
297 .as_ref()
298 .map(|predicate| {
299 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
300 let predicate = normalize_enum_literals(&schema_info, predicate)?;
301 Ok::<Predicate, ValidateError>(normalize(&predicate))
302 })
303 .transpose()?;
304 let access_plan_value = match &self.key_access {
305 Some(state) => access_plan_from_keys_value(&state.access),
306 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
307 };
308
309 let plan = LogicalPlan {
311 mode: self.mode,
312 access: access_plan_value,
313 predicate: normalized_predicate,
314 order: canonicalize_order_spec(self.model, self.order.clone()),
317 delete_limit: match self.mode {
318 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
319 QueryMode::Load(_) => None,
320 },
321 page: match self.mode {
322 QueryMode::Load(spec) => {
323 if spec.limit.is_some() || spec.offset > 0 {
324 Some(PageSpec {
325 limit: spec.limit,
326 offset: spec.offset,
327 })
328 } else {
329 None
330 }
331 }
332 QueryMode::Delete(_) => None,
333 },
334 consistency: self.consistency,
335 };
336
337 validate_logical_plan_model(&schema_info, self.model, &plan)?;
338
339 Ok(plan)
340 }
341
342 fn validate_intent(&self) -> Result<(), IntentError> {
344 if self.key_access_conflict {
345 return Err(IntentError::KeyAccessConflict);
346 }
347
348 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
349 .map_err(IntentError::from)?;
350
351 if let Some(state) = &self.key_access {
352 match state.kind {
353 KeyAccessKind::Many if self.predicate.is_some() => {
354 return Err(IntentError::ByIdsWithPredicate);
355 }
356 KeyAccessKind::Only if self.predicate.is_some() => {
357 return Err(IntentError::OnlyWithPredicate);
358 }
359 _ => {
360 }
362 }
363 }
364
365 Ok(())
366 }
367}
368
369#[derive(Debug)]
381pub struct Query<E: EntityKind> {
382 intent: QueryModel<'static, E::Key>,
383 _marker: PhantomData<E>,
384}
385
386impl<E: EntityKind> Query<E> {
387 #[must_use]
391 pub const fn new(consistency: ReadConsistency) -> Self {
392 Self {
393 intent: QueryModel::new(E::MODEL, consistency),
394 _marker: PhantomData,
395 }
396 }
397
398 #[must_use]
400 pub const fn mode(&self) -> QueryMode {
401 self.intent.mode()
402 }
403
404 #[must_use]
405 pub(crate) fn has_explicit_order(&self) -> bool {
406 self.intent.has_explicit_order()
407 }
408
409 #[must_use]
410 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
411 self.intent.load_spec()
412 }
413
414 #[must_use]
416 pub fn filter(mut self, predicate: Predicate) -> Self {
417 self.intent = self.intent.filter(predicate);
418 self
419 }
420
421 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
423 let Self { intent, _marker } = self;
424 let intent = intent.filter_expr(expr)?;
425
426 Ok(Self { intent, _marker })
427 }
428
429 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
431 let Self { intent, _marker } = self;
432 let intent = intent.sort_expr(expr)?;
433
434 Ok(Self { intent, _marker })
435 }
436
437 #[must_use]
439 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
440 self.intent = self.intent.order_by(field);
441 self
442 }
443
444 #[must_use]
446 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
447 self.intent = self.intent.order_by_desc(field);
448 self
449 }
450
451 pub(crate) fn by_id(self, id: E::Key) -> Self {
453 let Self { intent, _marker } = self;
454 Self {
455 intent: intent.by_id(id),
456 _marker,
457 }
458 }
459
460 pub(crate) fn by_ids<I>(self, ids: I) -> Self
462 where
463 I: IntoIterator<Item = E::Key>,
464 {
465 let Self { intent, _marker } = self;
466 Self {
467 intent: intent.by_ids(ids),
468 _marker,
469 }
470 }
471
472 #[must_use]
474 pub fn delete(mut self) -> Self {
475 self.intent = self.intent.delete();
476 self
477 }
478
479 #[must_use]
485 pub fn limit(mut self, limit: u32) -> Self {
486 self.intent = self.intent.limit(limit);
487 self
488 }
489
490 #[must_use]
494 pub fn offset(mut self, offset: u32) -> Self {
495 self.intent = self.intent.offset(offset);
496 self
497 }
498
499 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
501 let plan = self.build_plan()?;
502
503 Ok(plan.explain_with_model(E::MODEL))
504 }
505
506 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
508 let plan = self.build_plan()?;
509
510 Ok(ExecutablePlan::new(plan))
511 }
512
513 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
515 let plan_value = self.intent.build_plan_model()?;
516 let LogicalPlan {
517 mode,
518 access,
519 predicate,
520 order,
521 delete_limit,
522 page,
523 consistency,
524 } = plan_value;
525
526 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
527 let plan = LogicalPlan {
528 mode,
529 access,
530 predicate,
531 order,
532 delete_limit,
533 page,
534 consistency,
535 };
536
537 Ok(plan)
538 }
539}
540
541impl<E> Query<E>
542where
543 E: EntityKind + SingletonEntity,
544 E::Key: Default,
545{
546 pub(crate) fn only(self) -> Self {
548 let Self { intent, _marker } = self;
549
550 Self {
551 intent: intent.only(E::Key::default()),
552 _marker,
553 }
554 }
555}
556
557#[derive(Debug, ThisError)]
562pub enum QueryError {
563 #[error("{0}")]
564 Validate(#[from] ValidateError),
565
566 #[error("{0}")]
567 Plan(Box<PlanError>),
568
569 #[error("{0}")]
570 Intent(#[from] IntentError),
571
572 #[error("{0}")]
573 Response(#[from] ResponseError),
574
575 #[error("{0}")]
576 Execute(#[from] InternalError),
577}
578
579impl From<PlannerError> for QueryError {
580 fn from(err: PlannerError) -> Self {
581 match err {
582 PlannerError::Plan(err) => Self::from(*err),
583 PlannerError::Internal(err) => Self::Execute(*err),
584 }
585 }
586}
587
588impl From<PlanError> for QueryError {
589 fn from(err: PlanError) -> Self {
590 Self::Plan(Box::new(err))
591 }
592}
593
594#[derive(Clone, Copy, Debug, ThisError)]
599pub enum IntentError {
600 #[error("{0}")]
601 PlanShape(#[from] policy::PlanPolicyError),
602
603 #[error("by_ids() cannot be combined with predicates")]
604 ByIdsWithPredicate,
605
606 #[error("only() cannot be combined with predicates")]
607 OnlyWithPredicate,
608
609 #[error("multiple key access methods were used on the same query")]
610 KeyAccessConflict,
611
612 #[error("cursor pagination requires an explicit ordering")]
613 CursorRequiresOrder,
614
615 #[error("cursor pagination requires an explicit limit")]
616 CursorRequiresLimit,
617
618 #[error("cursor pagination does not support offset; use the cursor token for continuation")]
619 CursorWithOffsetUnsupported,
620}
621
622impl From<policy::CursorPagingPolicyError> for IntentError {
623 fn from(err: policy::CursorPagingPolicyError) -> Self {
624 match err {
625 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
626 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
627 policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
628 Self::CursorWithOffsetUnsupported
629 }
630 }
631 }
632}
633
634fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
636 match order {
637 Some(mut spec) => {
638 spec.fields.push((field.to_string(), direction));
639 spec
640 }
641 None => OrderSpec {
642 fields: vec![(field.to_string(), direction)],
643 },
644 }
645}
646
647fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
652 let mut order = order?;
653 if order.fields.is_empty() {
654 return Some(order);
655 }
656
657 let pk_field = model.primary_key.name;
658 let mut pk_direction = None;
659 order.fields.retain(|(field, direction)| {
660 if field == pk_field {
661 if pk_direction.is_none() {
662 pk_direction = Some(*direction);
663 }
664 false
665 } else {
666 true
667 }
668 });
669
670 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
671 order.fields.push((pk_field.to_string(), pk_direction));
672
673 Some(order)
674}