1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 policy,
21 predicate::{
22 Predicate, SchemaInfo, ValidateError, normalize,
23 validate::reject_unsupported_query_features,
24 },
25 },
26 response::ResponseError,
27 },
28 error::InternalError,
29 traits::{EntityKind, FieldValue, SingletonEntity},
30 value::Value,
31};
32use std::marker::PhantomData;
33use thiserror::Error as ThisError;
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
43pub enum QueryMode {
44 Load(LoadSpec),
45 Delete(DeleteSpec),
46}
47
48impl QueryMode {
49 #[must_use]
51 pub const fn is_load(&self) -> bool {
52 match self {
53 Self::Load(_) => true,
54 Self::Delete(_) => false,
55 }
56 }
57
58 #[must_use]
60 pub const fn is_delete(&self) -> bool {
61 match self {
62 Self::Delete(_) => true,
63 Self::Load(_) => false,
64 }
65 }
66}
67
68#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
75pub struct LoadSpec {
76 pub limit: Option<u32>,
77 pub offset: u32,
78}
79
80impl LoadSpec {
81 #[must_use]
83 pub const fn new() -> Self {
84 Self {
85 limit: None,
86 offset: 0,
87 }
88 }
89}
90
91#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub struct DeleteSpec {
99 pub limit: Option<u32>,
100}
101
102impl DeleteSpec {
103 #[must_use]
105 pub const fn new() -> Self {
106 Self { limit: None }
107 }
108}
109
110#[derive(Debug)]
118pub(crate) struct QueryModel<'m, K> {
119 model: &'m crate::model::entity::EntityModel,
120 mode: QueryMode,
121 predicate: Option<Predicate>,
122 key_access: Option<KeyAccessState<K>>,
123 key_access_conflict: bool,
124 order: Option<OrderSpec>,
125 consistency: ReadConsistency,
126}
127
128impl<'m, K: FieldValue> QueryModel<'m, K> {
129 #[must_use]
130 pub const fn new(
131 model: &'m crate::model::entity::EntityModel,
132 consistency: ReadConsistency,
133 ) -> Self {
134 Self {
135 model,
136 mode: QueryMode::Load(LoadSpec::new()),
137 predicate: None,
138 key_access: None,
139 key_access_conflict: false,
140 order: None,
141 consistency,
142 }
143 }
144
145 #[must_use]
147 pub const fn mode(&self) -> QueryMode {
148 self.mode
149 }
150
151 #[must_use]
152 fn has_explicit_order(&self) -> bool {
153 policy::has_explicit_order(self.order.as_ref())
154 }
155
156 #[must_use]
157 const fn load_spec(&self) -> Option<LoadSpec> {
158 match self.mode {
159 QueryMode::Load(spec) => Some(spec),
160 QueryMode::Delete(_) => None,
161 }
162 }
163
164 #[must_use]
166 pub fn filter(mut self, predicate: Predicate) -> Self {
167 self.predicate = match self.predicate.take() {
168 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169 None => Some(predicate),
170 };
171 self
172 }
173
174 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176 let schema = SchemaInfo::from_entity_model(self.model)?;
177 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179 Ok(self.filter(predicate))
180 }
181
182 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184 let schema = SchemaInfo::from_entity_model(self.model)?;
185 let order = match expr.lower_with(&schema) {
186 Ok(order) => order,
187 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189 };
190
191 if order.fields.is_empty() {
192 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
193 }
194
195 Ok(self.order_spec(order))
196 }
197
198 #[must_use]
200 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
201 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202 self
203 }
204
205 #[must_use]
207 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209 self
210 }
211
212 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214 self.order = Some(order);
215 self
216 }
217
218 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
220 if let Some(existing) = &self.key_access
221 && existing.kind != kind
222 {
223 self.key_access_conflict = true;
224 }
225
226 self.key_access = Some(KeyAccessState { kind, access });
227
228 self
229 }
230
231 pub(crate) fn by_id(self, id: K) -> Self {
233 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
234 }
235
236 pub(crate) fn by_ids<I>(self, ids: I) -> Self
238 where
239 I: IntoIterator<Item = K>,
240 {
241 self.set_key_access(
242 KeyAccessKind::Many,
243 KeyAccess::Many(ids.into_iter().collect()),
244 )
245 }
246
247 pub(crate) fn only(self, id: K) -> Self {
249 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
250 }
251
252 #[must_use]
254 pub const fn delete(mut self) -> Self {
255 if self.mode.is_load() {
256 self.mode = QueryMode::Delete(DeleteSpec::new());
257 }
258 self
259 }
260
261 #[must_use]
265 pub const fn limit(mut self, limit: u32) -> Self {
266 match self.mode {
267 QueryMode::Load(mut spec) => {
268 spec.limit = Some(limit);
269 self.mode = QueryMode::Load(spec);
270 }
271 QueryMode::Delete(mut spec) => {
272 spec.limit = Some(limit);
273 self.mode = QueryMode::Delete(spec);
274 }
275 }
276 self
277 }
278
279 #[must_use]
281 pub const fn offset(mut self, offset: u32) -> Self {
282 if let QueryMode::Load(mut spec) = self.mode {
283 spec.offset = offset;
284 self.mode = QueryMode::Load(spec);
285 }
286 self
287 }
288
289 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
291 let schema_info = SchemaInfo::from_entity_model(self.model)?;
293 self.validate_intent()?;
294
295 if let Some(predicate) = self.predicate.as_ref() {
296 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
297 }
298
299 let normalized_predicate = self.predicate.as_ref().map(normalize);
301 let access_plan_value = match &self.key_access {
302 Some(state) => access_plan_from_keys_value(&state.access),
303 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
304 };
305
306 let plan = LogicalPlan {
308 mode: self.mode,
309 access: access_plan_value,
310 predicate: normalized_predicate,
311 order: canonicalize_order_spec(self.model, self.order.clone()),
314 delete_limit: match self.mode {
315 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
316 QueryMode::Load(_) => None,
317 },
318 page: match self.mode {
319 QueryMode::Load(spec) => {
320 if spec.limit.is_some() || spec.offset > 0 {
321 Some(PageSpec {
322 limit: spec.limit,
323 offset: spec.offset,
324 })
325 } else {
326 None
327 }
328 }
329 QueryMode::Delete(_) => None,
330 },
331 consistency: self.consistency,
332 };
333
334 validate_logical_plan_model(&schema_info, self.model, &plan)?;
335
336 Ok(plan)
337 }
338
339 fn validate_intent(&self) -> Result<(), IntentError> {
341 if self.key_access_conflict {
342 return Err(IntentError::KeyAccessConflict);
343 }
344
345 if policy::has_empty_order(self.order.as_ref()) {
346 return Err(IntentError::EmptyOrderSpec);
347 }
348
349 if let Some(state) = &self.key_access {
350 match state.kind {
351 KeyAccessKind::Many if self.predicate.is_some() => {
352 return Err(IntentError::ByIdsWithPredicate);
353 }
354 KeyAccessKind::Only if self.predicate.is_some() => {
355 return Err(IntentError::OnlyWithPredicate);
356 }
357 _ => {
358 }
360 }
361 }
362
363 match self.mode {
364 QueryMode::Load(_) => {}
365 QueryMode::Delete(spec) => {
366 if spec.limit.is_some() && !policy::has_explicit_order(self.order.as_ref()) {
367 return Err(IntentError::DeleteLimitRequiresOrder);
368 }
369 }
370 }
371
372 Ok(())
373 }
374}
375
376#[derive(Debug)]
388pub struct Query<E: EntityKind> {
389 intent: QueryModel<'static, E::Key>,
390 #[allow(clippy::struct_field_names)]
391 _marker: PhantomData<E>,
392}
393
394impl<E: EntityKind> Query<E> {
395 #[must_use]
399 pub const fn new(consistency: ReadConsistency) -> Self {
400 Self {
401 intent: QueryModel::new(E::MODEL, consistency),
402 _marker: PhantomData,
403 }
404 }
405
406 #[must_use]
408 pub const fn mode(&self) -> QueryMode {
409 self.intent.mode()
410 }
411
412 #[must_use]
413 pub(crate) fn has_explicit_order(&self) -> bool {
414 self.intent.has_explicit_order()
415 }
416
417 #[must_use]
418 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
419 self.intent.load_spec()
420 }
421
422 #[must_use]
424 pub fn filter(mut self, predicate: Predicate) -> Self {
425 self.intent = self.intent.filter(predicate);
426 self
427 }
428
429 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
431 let Self { intent, _marker } = self;
432 let intent = intent.filter_expr(expr)?;
433
434 Ok(Self { intent, _marker })
435 }
436
437 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
439 let Self { intent, _marker } = self;
440 let intent = intent.sort_expr(expr)?;
441
442 Ok(Self { intent, _marker })
443 }
444
445 #[must_use]
447 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
448 self.intent = self.intent.order_by(field);
449 self
450 }
451
452 #[must_use]
454 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
455 self.intent = self.intent.order_by_desc(field);
456 self
457 }
458
459 pub(crate) fn by_id(self, id: E::Key) -> Self {
461 let Self { intent, _marker } = self;
462 Self {
463 intent: intent.by_id(id),
464 _marker,
465 }
466 }
467
468 pub(crate) fn by_ids<I>(self, ids: I) -> Self
470 where
471 I: IntoIterator<Item = E::Key>,
472 {
473 let Self { intent, _marker } = self;
474 Self {
475 intent: intent.by_ids(ids),
476 _marker,
477 }
478 }
479
480 #[must_use]
482 pub fn delete(mut self) -> Self {
483 self.intent = self.intent.delete();
484 self
485 }
486
487 #[must_use]
493 pub fn limit(mut self, limit: u32) -> Self {
494 self.intent = self.intent.limit(limit);
495 self
496 }
497
498 #[must_use]
502 pub fn offset(mut self, offset: u32) -> Self {
503 self.intent = self.intent.offset(offset);
504 self
505 }
506
507 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
509 let plan = self.build_plan()?;
510
511 Ok(plan.explain())
512 }
513
514 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
516 let plan = self.build_plan()?;
517
518 Ok(ExecutablePlan::new(plan))
519 }
520
521 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
523 let plan_value = self.intent.build_plan_model()?;
524 let LogicalPlan {
525 mode,
526 access,
527 predicate,
528 order,
529 delete_limit,
530 page,
531 consistency,
532 } = plan_value;
533
534 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
535 let plan = LogicalPlan {
536 mode,
537 access,
538 predicate,
539 order,
540 delete_limit,
541 page,
542 consistency,
543 };
544
545 Ok(plan)
546 }
547}
548
549impl<E> Query<E>
550where
551 E: EntityKind + SingletonEntity,
552 E::Key: Default,
553{
554 pub(crate) fn only(self) -> Self {
556 let Self { intent, _marker } = self;
557
558 Self {
559 intent: intent.only(E::Key::default()),
560 _marker,
561 }
562 }
563}
564
565#[derive(Debug, ThisError)]
570pub enum QueryError {
571 #[error("{0}")]
572 Validate(#[from] ValidateError),
573
574 #[error("{0}")]
575 Plan(Box<PlanError>),
576
577 #[error("{0}")]
578 Intent(#[from] IntentError),
579
580 #[error("{0}")]
581 Response(#[from] ResponseError),
582
583 #[error("{0}")]
584 Execute(#[from] InternalError),
585}
586
587impl From<PlannerError> for QueryError {
588 fn from(err: PlannerError) -> Self {
589 match err {
590 PlannerError::Plan(err) => Self::from(*err),
591 PlannerError::Internal(err) => Self::Execute(*err),
592 }
593 }
594}
595
596impl From<PlanError> for QueryError {
597 fn from(err: PlanError) -> Self {
598 Self::Plan(Box::new(err))
599 }
600}
601
602#[derive(Clone, Copy, Debug, ThisError)]
607pub enum IntentError {
608 #[error("delete limit requires an explicit ordering")]
609 DeleteLimitRequiresOrder,
610
611 #[error("order specification must include at least one field")]
612 EmptyOrderSpec,
613
614 #[error("by_ids() cannot be combined with predicates")]
615 ByIdsWithPredicate,
616
617 #[error("only() cannot be combined with predicates")]
618 OnlyWithPredicate,
619
620 #[error("multiple key access methods were used on the same query")]
621 KeyAccessConflict,
622
623 #[error("cursor pagination requires an explicit ordering")]
624 CursorRequiresOrder,
625
626 #[error("cursor pagination requires an explicit limit")]
627 CursorRequiresLimit,
628
629 #[error("cursor pagination does not support offset; use the cursor token for continuation")]
630 CursorWithOffsetUnsupported,
631}
632
633fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
635 match order {
636 Some(mut spec) => {
637 spec.fields.push((field.to_string(), direction));
638 spec
639 }
640 None => OrderSpec {
641 fields: vec![(field.to_string(), direction)],
642 },
643 }
644}
645
646fn canonicalize_order_spec(
651 model: &crate::model::entity::EntityModel,
652 order: Option<OrderSpec>,
653) -> Option<OrderSpec> {
654 let mut order = order?;
655 if order.fields.is_empty() {
656 return Some(order);
657 }
658
659 let pk_field = model.primary_key.name;
660 let mut pk_direction = None;
661 order.fields.retain(|(field, direction)| {
662 if field == pk_field {
663 if pk_direction.is_none() {
664 pk_direction = Some(*direction);
665 }
666 false
667 } else {
668 true
669 }
670 });
671
672 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
673 order.fields.push((pk_field.to_string(), pk_direction));
674
675 Some(order)
676}