1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 policy,
21 predicate::{
22 Predicate, SchemaInfo, ValidateError, normalize,
23 validate::reject_unsupported_query_features,
24 },
25 },
26 response::ResponseError,
27 },
28 error::InternalError,
29 model::entity::EntityModel,
30 traits::{EntityKind, FieldValue, SingletonEntity},
31 value::Value,
32};
33use std::marker::PhantomData;
34use thiserror::Error as ThisError;
35
36#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub enum QueryMode {
45 Load(LoadSpec),
46 Delete(DeleteSpec),
47}
48
49impl QueryMode {
50 #[must_use]
52 pub const fn is_load(&self) -> bool {
53 match self {
54 Self::Load(_) => true,
55 Self::Delete(_) => false,
56 }
57 }
58
59 #[must_use]
61 pub const fn is_delete(&self) -> bool {
62 match self {
63 Self::Delete(_) => true,
64 Self::Load(_) => false,
65 }
66 }
67}
68
69#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
76pub struct LoadSpec {
77 pub limit: Option<u32>,
78 pub offset: u32,
79}
80
81impl LoadSpec {
82 #[must_use]
84 pub const fn new() -> Self {
85 Self {
86 limit: None,
87 offset: 0,
88 }
89 }
90}
91
92#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub struct DeleteSpec {
100 pub limit: Option<u32>,
101}
102
103impl DeleteSpec {
104 #[must_use]
106 pub const fn new() -> Self {
107 Self { limit: None }
108 }
109}
110
111#[derive(Debug)]
119pub(crate) struct QueryModel<'m, K> {
120 model: &'m EntityModel,
121 mode: QueryMode,
122 predicate: Option<Predicate>,
123 key_access: Option<KeyAccessState<K>>,
124 key_access_conflict: bool,
125 order: Option<OrderSpec>,
126 consistency: ReadConsistency,
127}
128
129impl<'m, K: FieldValue> QueryModel<'m, K> {
130 #[must_use]
131 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
132 Self {
133 model,
134 mode: QueryMode::Load(LoadSpec::new()),
135 predicate: None,
136 key_access: None,
137 key_access_conflict: false,
138 order: None,
139 consistency,
140 }
141 }
142
143 #[must_use]
145 pub(crate) const fn mode(&self) -> QueryMode {
146 self.mode
147 }
148
149 #[must_use]
150 fn has_explicit_order(&self) -> bool {
151 policy::has_explicit_order(self.order.as_ref())
152 }
153
154 #[must_use]
155 const fn load_spec(&self) -> Option<LoadSpec> {
156 match self.mode {
157 QueryMode::Load(spec) => Some(spec),
158 QueryMode::Delete(_) => None,
159 }
160 }
161
162 #[must_use]
164 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
165 self.predicate = match self.predicate.take() {
166 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
167 None => Some(predicate),
168 };
169 self
170 }
171
172 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
174 let schema = SchemaInfo::from_entity_model(self.model)?;
175 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
176
177 Ok(self.filter(predicate))
178 }
179
180 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
182 let schema = SchemaInfo::from_entity_model(self.model)?;
183 let order = match expr.lower_with(&schema) {
184 Ok(order) => order,
185 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
186 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
187 };
188
189 policy::validate_order_shape(Some(&order))
190 .map_err(IntentError::from)
191 .map_err(QueryError::from)?;
192
193 Ok(self.order_spec(order))
194 }
195
196 #[must_use]
198 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
199 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
200 self
201 }
202
203 #[must_use]
205 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
206 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
207 self
208 }
209
210 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
212 self.order = Some(order);
213 self
214 }
215
216 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
218 if let Some(existing) = &self.key_access
219 && existing.kind != kind
220 {
221 self.key_access_conflict = true;
222 }
223
224 self.key_access = Some(KeyAccessState { kind, access });
225
226 self
227 }
228
229 pub(crate) fn by_id(self, id: K) -> Self {
231 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
232 }
233
234 pub(crate) fn by_ids<I>(self, ids: I) -> Self
236 where
237 I: IntoIterator<Item = K>,
238 {
239 self.set_key_access(
240 KeyAccessKind::Many,
241 KeyAccess::Many(ids.into_iter().collect()),
242 )
243 }
244
245 pub(crate) fn only(self, id: K) -> Self {
247 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
248 }
249
250 #[must_use]
252 pub(crate) const fn delete(mut self) -> Self {
253 if self.mode.is_load() {
254 self.mode = QueryMode::Delete(DeleteSpec::new());
255 }
256 self
257 }
258
259 #[must_use]
263 pub(crate) const fn limit(mut self, limit: u32) -> Self {
264 match self.mode {
265 QueryMode::Load(mut spec) => {
266 spec.limit = Some(limit);
267 self.mode = QueryMode::Load(spec);
268 }
269 QueryMode::Delete(mut spec) => {
270 spec.limit = Some(limit);
271 self.mode = QueryMode::Delete(spec);
272 }
273 }
274 self
275 }
276
277 #[must_use]
279 pub(crate) const fn offset(mut self, offset: u32) -> Self {
280 if let QueryMode::Load(mut spec) = self.mode {
281 spec.offset = offset;
282 self.mode = QueryMode::Load(spec);
283 }
284 self
285 }
286
287 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
289 let schema_info = SchemaInfo::from_entity_model(self.model)?;
291 self.validate_intent()?;
292
293 if let Some(predicate) = self.predicate.as_ref() {
294 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
295 }
296
297 let normalized_predicate = self.predicate.as_ref().map(normalize);
299 let access_plan_value = match &self.key_access {
300 Some(state) => access_plan_from_keys_value(&state.access),
301 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
302 };
303
304 let plan = LogicalPlan {
306 mode: self.mode,
307 access: access_plan_value,
308 predicate: normalized_predicate,
309 order: canonicalize_order_spec(self.model, self.order.clone()),
312 delete_limit: match self.mode {
313 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
314 QueryMode::Load(_) => None,
315 },
316 page: match self.mode {
317 QueryMode::Load(spec) => {
318 if spec.limit.is_some() || spec.offset > 0 {
319 Some(PageSpec {
320 limit: spec.limit,
321 offset: spec.offset,
322 })
323 } else {
324 None
325 }
326 }
327 QueryMode::Delete(_) => None,
328 },
329 consistency: self.consistency,
330 };
331
332 validate_logical_plan_model(&schema_info, self.model, &plan)?;
333
334 Ok(plan)
335 }
336
337 fn validate_intent(&self) -> Result<(), IntentError> {
339 if self.key_access_conflict {
340 return Err(IntentError::KeyAccessConflict);
341 }
342
343 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
344 .map_err(IntentError::from)?;
345
346 if let Some(state) = &self.key_access {
347 match state.kind {
348 KeyAccessKind::Many if self.predicate.is_some() => {
349 return Err(IntentError::ByIdsWithPredicate);
350 }
351 KeyAccessKind::Only if self.predicate.is_some() => {
352 return Err(IntentError::OnlyWithPredicate);
353 }
354 _ => {
355 }
357 }
358 }
359
360 Ok(())
361 }
362}
363
364#[derive(Debug)]
376pub struct Query<E: EntityKind> {
377 intent: QueryModel<'static, E::Key>,
378 _marker: PhantomData<E>,
379}
380
381impl<E: EntityKind> Query<E> {
382 #[must_use]
386 pub const fn new(consistency: ReadConsistency) -> Self {
387 Self {
388 intent: QueryModel::new(E::MODEL, consistency),
389 _marker: PhantomData,
390 }
391 }
392
393 #[must_use]
395 pub const fn mode(&self) -> QueryMode {
396 self.intent.mode()
397 }
398
399 #[must_use]
400 pub(crate) fn has_explicit_order(&self) -> bool {
401 self.intent.has_explicit_order()
402 }
403
404 #[must_use]
405 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
406 self.intent.load_spec()
407 }
408
409 #[must_use]
411 pub fn filter(mut self, predicate: Predicate) -> Self {
412 self.intent = self.intent.filter(predicate);
413 self
414 }
415
416 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
418 let Self { intent, _marker } = self;
419 let intent = intent.filter_expr(expr)?;
420
421 Ok(Self { intent, _marker })
422 }
423
424 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
426 let Self { intent, _marker } = self;
427 let intent = intent.sort_expr(expr)?;
428
429 Ok(Self { intent, _marker })
430 }
431
432 #[must_use]
434 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
435 self.intent = self.intent.order_by(field);
436 self
437 }
438
439 #[must_use]
441 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
442 self.intent = self.intent.order_by_desc(field);
443 self
444 }
445
446 pub(crate) fn by_id(self, id: E::Key) -> Self {
448 let Self { intent, _marker } = self;
449 Self {
450 intent: intent.by_id(id),
451 _marker,
452 }
453 }
454
455 pub(crate) fn by_ids<I>(self, ids: I) -> Self
457 where
458 I: IntoIterator<Item = E::Key>,
459 {
460 let Self { intent, _marker } = self;
461 Self {
462 intent: intent.by_ids(ids),
463 _marker,
464 }
465 }
466
467 #[must_use]
469 pub fn delete(mut self) -> Self {
470 self.intent = self.intent.delete();
471 self
472 }
473
474 #[must_use]
480 pub fn limit(mut self, limit: u32) -> Self {
481 self.intent = self.intent.limit(limit);
482 self
483 }
484
485 #[must_use]
489 pub fn offset(mut self, offset: u32) -> Self {
490 self.intent = self.intent.offset(offset);
491 self
492 }
493
494 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
496 let plan = self.build_plan()?;
497
498 Ok(plan.explain_with_model(E::MODEL))
499 }
500
501 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
503 let plan = self.build_plan()?;
504
505 Ok(ExecutablePlan::new(plan))
506 }
507
508 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
510 let plan_value = self.intent.build_plan_model()?;
511 let LogicalPlan {
512 mode,
513 access,
514 predicate,
515 order,
516 delete_limit,
517 page,
518 consistency,
519 } = plan_value;
520
521 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
522 let plan = LogicalPlan {
523 mode,
524 access,
525 predicate,
526 order,
527 delete_limit,
528 page,
529 consistency,
530 };
531
532 Ok(plan)
533 }
534}
535
536impl<E> Query<E>
537where
538 E: EntityKind + SingletonEntity,
539 E::Key: Default,
540{
541 pub(crate) fn only(self) -> Self {
543 let Self { intent, _marker } = self;
544
545 Self {
546 intent: intent.only(E::Key::default()),
547 _marker,
548 }
549 }
550}
551
552#[derive(Debug, ThisError)]
557pub enum QueryError {
558 #[error("{0}")]
559 Validate(#[from] ValidateError),
560
561 #[error("{0}")]
562 Plan(Box<PlanError>),
563
564 #[error("{0}")]
565 Intent(#[from] IntentError),
566
567 #[error("{0}")]
568 Response(#[from] ResponseError),
569
570 #[error("{0}")]
571 Execute(#[from] InternalError),
572}
573
574impl From<PlannerError> for QueryError {
575 fn from(err: PlannerError) -> Self {
576 match err {
577 PlannerError::Plan(err) => Self::from(*err),
578 PlannerError::Internal(err) => Self::Execute(*err),
579 }
580 }
581}
582
583impl From<PlanError> for QueryError {
584 fn from(err: PlanError) -> Self {
585 Self::Plan(Box::new(err))
586 }
587}
588
589#[derive(Clone, Copy, Debug, ThisError)]
594pub enum IntentError {
595 #[error("{0}")]
596 PlanShape(#[from] policy::PlanPolicyError),
597
598 #[error("by_ids() cannot be combined with predicates")]
599 ByIdsWithPredicate,
600
601 #[error("only() cannot be combined with predicates")]
602 OnlyWithPredicate,
603
604 #[error("multiple key access methods were used on the same query")]
605 KeyAccessConflict,
606
607 #[error("cursor pagination requires an explicit ordering")]
608 CursorRequiresOrder,
609
610 #[error("cursor pagination requires an explicit limit")]
611 CursorRequiresLimit,
612
613 #[error("cursor pagination does not support offset; use the cursor token for continuation")]
614 CursorWithOffsetUnsupported,
615}
616
617impl From<policy::CursorPagingPolicyError> for IntentError {
618 fn from(err: policy::CursorPagingPolicyError) -> Self {
619 match err {
620 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
621 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
622 policy::CursorPagingPolicyError::CursorWithOffsetUnsupported => {
623 Self::CursorWithOffsetUnsupported
624 }
625 }
626 }
627}
628
629fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
631 match order {
632 Some(mut spec) => {
633 spec.fields.push((field.to_string(), direction));
634 spec
635 }
636 None => OrderSpec {
637 fields: vec![(field.to_string(), direction)],
638 },
639 }
640}
641
642fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
647 let mut order = order?;
648 if order.fields.is_empty() {
649 return Some(order);
650 }
651
652 let pk_field = model.primary_key.name;
653 let mut pk_direction = None;
654 order.fields.retain(|(field, direction)| {
655 if field == pk_field {
656 if pk_direction.is_none() {
657 pk_direction = Some(*direction);
658 }
659 false
660 } else {
661 true
662 }
663 });
664
665 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
666 order.fields.push((pk_field.to_string(), pk_direction));
667
668 Some(order)
669}