1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 policy,
21 predicate::{
22 Predicate, SchemaInfo, ValidateError, normalize,
23 validate::reject_unsupported_query_features,
24 },
25 },
26 response::ResponseError,
27 },
28 error::InternalError,
29 traits::{EntityKind, FieldValue, SingletonEntity},
30 value::Value,
31};
32use std::marker::PhantomData;
33use thiserror::Error as ThisError;
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
43pub enum QueryMode {
44 Load(LoadSpec),
45 Delete(DeleteSpec),
46}
47
48impl QueryMode {
49 #[must_use]
51 pub const fn is_load(&self) -> bool {
52 match self {
53 Self::Load(_) => true,
54 Self::Delete(_) => false,
55 }
56 }
57
58 #[must_use]
60 pub const fn is_delete(&self) -> bool {
61 match self {
62 Self::Delete(_) => true,
63 Self::Load(_) => false,
64 }
65 }
66}
67
68#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
75pub struct LoadSpec {
76 pub limit: Option<u32>,
77 pub offset: u32,
78}
79
80impl LoadSpec {
81 #[must_use]
83 pub const fn new() -> Self {
84 Self {
85 limit: None,
86 offset: 0,
87 }
88 }
89}
90
91#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub struct DeleteSpec {
99 pub limit: Option<u32>,
100}
101
102impl DeleteSpec {
103 #[must_use]
105 pub const fn new() -> Self {
106 Self { limit: None }
107 }
108}
109
110#[derive(Debug)]
118pub(crate) struct QueryModel<'m, K> {
119 model: &'m crate::model::entity::EntityModel,
120 mode: QueryMode,
121 predicate: Option<Predicate>,
122 key_access: Option<KeyAccessState<K>>,
123 key_access_conflict: bool,
124 order: Option<OrderSpec>,
125 consistency: ReadConsistency,
126}
127
128impl<'m, K: FieldValue> QueryModel<'m, K> {
129 #[must_use]
130 pub const fn new(
131 model: &'m crate::model::entity::EntityModel,
132 consistency: ReadConsistency,
133 ) -> Self {
134 Self {
135 model,
136 mode: QueryMode::Load(LoadSpec::new()),
137 predicate: None,
138 key_access: None,
139 key_access_conflict: false,
140 order: None,
141 consistency,
142 }
143 }
144
145 #[must_use]
147 pub const fn mode(&self) -> QueryMode {
148 self.mode
149 }
150
151 #[must_use]
152 fn has_explicit_order(&self) -> bool {
153 policy::has_explicit_order(self.order.as_ref())
154 }
155
156 #[must_use]
157 const fn load_spec(&self) -> Option<LoadSpec> {
158 match self.mode {
159 QueryMode::Load(spec) => Some(spec),
160 QueryMode::Delete(_) => None,
161 }
162 }
163
164 #[must_use]
166 pub fn filter(mut self, predicate: Predicate) -> Self {
167 self.predicate = match self.predicate.take() {
168 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169 None => Some(predicate),
170 };
171 self
172 }
173
174 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176 let schema = SchemaInfo::from_entity_model(self.model)?;
177 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179 Ok(self.filter(predicate))
180 }
181
182 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184 let schema = SchemaInfo::from_entity_model(self.model)?;
185 let order = match expr.lower_with(&schema) {
186 Ok(order) => order,
187 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189 };
190
191 policy::validate_order_shape(Some(&order))
192 .map_err(IntentError::from)
193 .map_err(QueryError::from)?;
194
195 Ok(self.order_spec(order))
196 }
197
198 #[must_use]
200 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
201 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202 self
203 }
204
205 #[must_use]
207 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209 self
210 }
211
212 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214 self.order = Some(order);
215 self
216 }
217
218 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
220 if let Some(existing) = &self.key_access
221 && existing.kind != kind
222 {
223 self.key_access_conflict = true;
224 }
225
226 self.key_access = Some(KeyAccessState { kind, access });
227
228 self
229 }
230
231 pub(crate) fn by_id(self, id: K) -> Self {
233 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
234 }
235
236 pub(crate) fn by_ids<I>(self, ids: I) -> Self
238 where
239 I: IntoIterator<Item = K>,
240 {
241 self.set_key_access(
242 KeyAccessKind::Many,
243 KeyAccess::Many(ids.into_iter().collect()),
244 )
245 }
246
247 pub(crate) fn only(self, id: K) -> Self {
249 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
250 }
251
252 #[must_use]
254 pub const fn delete(mut self) -> Self {
255 if self.mode.is_load() {
256 self.mode = QueryMode::Delete(DeleteSpec::new());
257 }
258 self
259 }
260
261 #[must_use]
265 pub const fn limit(mut self, limit: u32) -> Self {
266 match self.mode {
267 QueryMode::Load(mut spec) => {
268 spec.limit = Some(limit);
269 self.mode = QueryMode::Load(spec);
270 }
271 QueryMode::Delete(mut spec) => {
272 spec.limit = Some(limit);
273 self.mode = QueryMode::Delete(spec);
274 }
275 }
276 self
277 }
278
279 #[must_use]
281 pub const fn offset(mut self, offset: u32) -> Self {
282 if let QueryMode::Load(mut spec) = self.mode {
283 spec.offset = offset;
284 self.mode = QueryMode::Load(spec);
285 }
286 self
287 }
288
289 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
291 let schema_info = SchemaInfo::from_entity_model(self.model)?;
293 self.validate_intent()?;
294
295 if let Some(predicate) = self.predicate.as_ref() {
296 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
297 }
298
299 let normalized_predicate = self.predicate.as_ref().map(normalize);
301 let access_plan_value = match &self.key_access {
302 Some(state) => access_plan_from_keys_value(&state.access),
303 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
304 };
305
306 let plan = LogicalPlan {
308 mode: self.mode,
309 access: access_plan_value,
310 predicate: normalized_predicate,
311 order: canonicalize_order_spec(self.model, self.order.clone()),
314 delete_limit: match self.mode {
315 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
316 QueryMode::Load(_) => None,
317 },
318 page: match self.mode {
319 QueryMode::Load(spec) => {
320 if spec.limit.is_some() || spec.offset > 0 {
321 Some(PageSpec {
322 limit: spec.limit,
323 offset: spec.offset,
324 })
325 } else {
326 None
327 }
328 }
329 QueryMode::Delete(_) => None,
330 },
331 consistency: self.consistency,
332 };
333
334 validate_logical_plan_model(&schema_info, self.model, &plan)?;
335
336 Ok(plan)
337 }
338
339 fn validate_intent(&self) -> Result<(), IntentError> {
341 if self.key_access_conflict {
342 return Err(IntentError::KeyAccessConflict);
343 }
344
345 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
346 .map_err(IntentError::from)?;
347
348 if let Some(state) = &self.key_access {
349 match state.kind {
350 KeyAccessKind::Many if self.predicate.is_some() => {
351 return Err(IntentError::ByIdsWithPredicate);
352 }
353 KeyAccessKind::Only if self.predicate.is_some() => {
354 return Err(IntentError::OnlyWithPredicate);
355 }
356 _ => {
357 }
359 }
360 }
361
362 Ok(())
363 }
364}
365
366#[derive(Debug)]
378pub struct Query<E: EntityKind> {
379 intent: QueryModel<'static, E::Key>,
380 _marker: PhantomData<E>,
381}
382
383impl<E: EntityKind> Query<E> {
384 #[must_use]
388 pub const fn new(consistency: ReadConsistency) -> Self {
389 Self {
390 intent: QueryModel::new(E::MODEL, consistency),
391 _marker: PhantomData,
392 }
393 }
394
395 #[must_use]
397 pub const fn mode(&self) -> QueryMode {
398 self.intent.mode()
399 }
400
401 #[must_use]
402 pub(crate) fn has_explicit_order(&self) -> bool {
403 self.intent.has_explicit_order()
404 }
405
406 #[must_use]
407 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
408 self.intent.load_spec()
409 }
410
411 #[must_use]
413 pub fn filter(mut self, predicate: Predicate) -> Self {
414 self.intent = self.intent.filter(predicate);
415 self
416 }
417
418 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
420 let Self { intent, _marker } = self;
421 let intent = intent.filter_expr(expr)?;
422
423 Ok(Self { intent, _marker })
424 }
425
426 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
428 let Self { intent, _marker } = self;
429 let intent = intent.sort_expr(expr)?;
430
431 Ok(Self { intent, _marker })
432 }
433
434 #[must_use]
436 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
437 self.intent = self.intent.order_by(field);
438 self
439 }
440
441 #[must_use]
443 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
444 self.intent = self.intent.order_by_desc(field);
445 self
446 }
447
448 pub(crate) fn by_id(self, id: E::Key) -> Self {
450 let Self { intent, _marker } = self;
451 Self {
452 intent: intent.by_id(id),
453 _marker,
454 }
455 }
456
457 pub(crate) fn by_ids<I>(self, ids: I) -> Self
459 where
460 I: IntoIterator<Item = E::Key>,
461 {
462 let Self { intent, _marker } = self;
463 Self {
464 intent: intent.by_ids(ids),
465 _marker,
466 }
467 }
468
469 #[must_use]
471 pub fn delete(mut self) -> Self {
472 self.intent = self.intent.delete();
473 self
474 }
475
476 #[must_use]
482 pub fn limit(mut self, limit: u32) -> Self {
483 self.intent = self.intent.limit(limit);
484 self
485 }
486
487 #[must_use]
491 pub fn offset(mut self, offset: u32) -> Self {
492 self.intent = self.intent.offset(offset);
493 self
494 }
495
496 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
498 let plan = self.build_plan()?;
499
500 Ok(plan.explain_with_model(E::MODEL))
501 }
502
503 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
505 let plan = self.build_plan()?;
506
507 Ok(ExecutablePlan::new(plan))
508 }
509
510 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
512 let plan_value = self.intent.build_plan_model()?;
513 let LogicalPlan {
514 mode,
515 access,
516 predicate,
517 order,
518 delete_limit,
519 page,
520 consistency,
521 } = plan_value;
522
523 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
524 let plan = LogicalPlan {
525 mode,
526 access,
527 predicate,
528 order,
529 delete_limit,
530 page,
531 consistency,
532 };
533
534 Ok(plan)
535 }
536}
537
538impl<E> Query<E>
539where
540 E: EntityKind + SingletonEntity,
541 E::Key: Default,
542{
543 pub(crate) fn only(self) -> Self {
545 let Self { intent, _marker } = self;
546
547 Self {
548 intent: intent.only(E::Key::default()),
549 _marker,
550 }
551 }
552}
553
554#[derive(Debug, ThisError)]
559pub enum QueryError {
560 #[error("{0}")]
561 Validate(#[from] ValidateError),
562
563 #[error("{0}")]
564 Plan(Box<PlanError>),
565
566 #[error("{0}")]
567 Intent(#[from] IntentError),
568
569 #[error("{0}")]
570 Response(#[from] ResponseError),
571
572 #[error("{0}")]
573 Execute(#[from] InternalError),
574}
575
576impl From<PlannerError> for QueryError {
577 fn from(err: PlannerError) -> Self {
578 match err {
579 PlannerError::Plan(err) => Self::from(*err),
580 PlannerError::Internal(err) => Self::Execute(*err),
581 }
582 }
583}
584
585impl From<PlanError> for QueryError {
586 fn from(err: PlanError) -> Self {
587 Self::Plan(Box::new(err))
588 }
589}
590
591#[derive(Clone, Copy, Debug, ThisError)]
596pub enum IntentError {
597 #[error("{0}")]
598 PlanShape(#[from] policy::PlanPolicyError),
599
600 #[error("by_ids() cannot be combined with predicates")]
601 ByIdsWithPredicate,
602
603 #[error("only() cannot be combined with predicates")]
604 OnlyWithPredicate,
605
606 #[error("multiple key access methods were used on the same query")]
607 KeyAccessConflict,
608
609 #[error("cursor pagination requires an explicit ordering")]
610 CursorRequiresOrder,
611
612 #[error("cursor pagination requires an explicit limit")]
613 CursorRequiresLimit,
614
615 #[error("cursor pagination does not support offset; use the cursor token for continuation")]
616 CursorWithOffsetUnsupported,
617}
618
619impl From<policy::CursorPagingPolicyError> for IntentError {
620 fn from(err: policy::CursorPagingPolicyError) -> Self {
621 match err {
622 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
623 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
624 policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
625 Self::CursorWithOffsetUnsupported
626 }
627 }
628 }
629}
630
631fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
633 match order {
634 Some(mut spec) => {
635 spec.fields.push((field.to_string(), direction));
636 spec
637 }
638 None => OrderSpec {
639 fields: vec![(field.to_string(), direction)],
640 },
641 }
642}
643
644fn canonicalize_order_spec(
649 model: &crate::model::entity::EntityModel,
650 order: Option<OrderSpec>,
651) -> Option<OrderSpec> {
652 let mut order = order?;
653 if order.fields.is_empty() {
654 return Some(order);
655 }
656
657 let pk_field = model.primary_key.name;
658 let mut pk_direction = None;
659 order.fields.retain(|(field, direction)| {
660 if field == pk_field {
661 if pk_direction.is_none() {
662 pk_direction = Some(*direction);
663 }
664 false
665 } else {
666 true
667 }
668 });
669
670 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
671 order.fields.push((pk_field.to_string(), pk_direction));
672
673 Some(order)
674}