1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 executor::ExecutablePlan,
12 query::{
13 ReadConsistency,
14 explain::ExplainPlan,
15 expr::{FilterExpr, SortExpr, SortLowerError},
16 plan::{
17 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
18 PageSpec, PlanError,
19 planner::{PlannerError, plan_access},
20 validate::validate_logical_plan_model,
21 },
22 policy,
23 predicate::{
24 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
25 validate::reject_unsupported_query_features,
26 },
27 },
28 response::ResponseError,
29 },
30 error::InternalError,
31 model::entity::EntityModel,
32 traits::{EntityKind, FieldValue, SingletonEntity},
33 value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46pub enum QueryMode {
47 Load(LoadSpec),
48 Delete(DeleteSpec),
49}
50
51impl QueryMode {
52 #[must_use]
54 pub const fn is_load(&self) -> bool {
55 match self {
56 Self::Load(_) => true,
57 Self::Delete(_) => false,
58 }
59 }
60
61 #[must_use]
63 pub const fn is_delete(&self) -> bool {
64 match self {
65 Self::Delete(_) => true,
66 Self::Load(_) => false,
67 }
68 }
69}
70
71#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
78pub struct LoadSpec {
79 pub limit: Option<u32>,
80 pub offset: u32,
81}
82
83impl LoadSpec {
84 #[must_use]
86 pub const fn new() -> Self {
87 Self {
88 limit: None,
89 offset: 0,
90 }
91 }
92}
93
94#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
101pub struct DeleteSpec {
102 pub limit: Option<u32>,
103}
104
105impl DeleteSpec {
106 #[must_use]
108 pub const fn new() -> Self {
109 Self { limit: None }
110 }
111}
112
113#[derive(Debug)]
121pub(crate) struct QueryModel<'m, K> {
122 model: &'m EntityModel,
123 mode: QueryMode,
124 predicate: Option<Predicate>,
125 key_access: Option<KeyAccessState<K>>,
126 key_access_conflict: bool,
127 order: Option<OrderSpec>,
128 distinct: bool,
129 consistency: ReadConsistency,
130}
131
132impl<'m, K: FieldValue> QueryModel<'m, K> {
133 #[must_use]
134 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
135 Self {
136 model,
137 mode: QueryMode::Load(LoadSpec::new()),
138 predicate: None,
139 key_access: None,
140 key_access_conflict: false,
141 order: None,
142 distinct: false,
143 consistency,
144 }
145 }
146
147 #[must_use]
149 pub(crate) const fn mode(&self) -> QueryMode {
150 self.mode
151 }
152
153 #[must_use]
154 fn has_explicit_order(&self) -> bool {
155 policy::has_explicit_order(self.order.as_ref())
156 }
157
158 #[must_use]
159 const fn load_spec(&self) -> Option<LoadSpec> {
160 match self.mode {
161 QueryMode::Load(spec) => Some(spec),
162 QueryMode::Delete(_) => None,
163 }
164 }
165
166 #[must_use]
168 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
169 self.predicate = match self.predicate.take() {
170 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
171 None => Some(predicate),
172 };
173 self
174 }
175
176 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
178 let schema = SchemaInfo::from_entity_model(self.model)?;
179 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
180
181 Ok(self.filter(predicate))
182 }
183
184 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
186 let schema = SchemaInfo::from_entity_model(self.model)?;
187 let order = match expr.lower_with(&schema) {
188 Ok(order) => order,
189 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
190 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
191 };
192
193 policy::validate_order_shape(Some(&order))
194 .map_err(IntentError::from)
195 .map_err(QueryError::from)?;
196
197 Ok(self.order_spec(order))
198 }
199
200 #[must_use]
202 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
203 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
204 self
205 }
206
207 #[must_use]
209 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
210 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
211 self
212 }
213
214 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
216 self.order = Some(order);
217 self
218 }
219
220 #[must_use]
222 pub(crate) const fn distinct(mut self) -> Self {
223 self.distinct = true;
224 self
225 }
226
227 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
229 if let Some(existing) = &self.key_access
230 && existing.kind != kind
231 {
232 self.key_access_conflict = true;
233 }
234
235 self.key_access = Some(KeyAccessState { kind, access });
236
237 self
238 }
239
240 pub(crate) fn by_id(self, id: K) -> Self {
242 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
243 }
244
245 pub(crate) fn by_ids<I>(self, ids: I) -> Self
247 where
248 I: IntoIterator<Item = K>,
249 {
250 self.set_key_access(
251 KeyAccessKind::Many,
252 KeyAccess::Many(ids.into_iter().collect()),
253 )
254 }
255
256 pub(crate) fn only(self, id: K) -> Self {
258 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
259 }
260
261 #[must_use]
263 pub(crate) const fn delete(mut self) -> Self {
264 if self.mode.is_load() {
265 self.mode = QueryMode::Delete(DeleteSpec::new());
266 }
267 self
268 }
269
270 #[must_use]
274 pub(crate) const fn limit(mut self, limit: u32) -> Self {
275 match self.mode {
276 QueryMode::Load(mut spec) => {
277 spec.limit = Some(limit);
278 self.mode = QueryMode::Load(spec);
279 }
280 QueryMode::Delete(mut spec) => {
281 spec.limit = Some(limit);
282 self.mode = QueryMode::Delete(spec);
283 }
284 }
285 self
286 }
287
288 #[must_use]
290 pub(crate) const fn offset(mut self, offset: u32) -> Self {
291 if let QueryMode::Load(mut spec) = self.mode {
292 spec.offset = offset;
293 self.mode = QueryMode::Load(spec);
294 }
295 self
296 }
297
298 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
300 let schema_info = SchemaInfo::from_entity_model(self.model)?;
302 self.validate_intent()?;
303
304 let normalized_predicate = self
306 .predicate
307 .as_ref()
308 .map(|predicate| {
309 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
310 let predicate = normalize_enum_literals(&schema_info, predicate)?;
311 Ok::<Predicate, ValidateError>(normalize(&predicate))
312 })
313 .transpose()?;
314 let access_plan_value = match &self.key_access {
315 Some(state) => access_plan_from_keys_value(&state.access),
316 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
317 };
318
319 let logical = LogicalPlan {
321 mode: self.mode,
322 predicate: normalized_predicate,
323 order: canonicalize_order_spec(self.model, self.order.clone()),
326 distinct: self.distinct,
327 delete_limit: match self.mode {
328 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
329 QueryMode::Load(_) => None,
330 },
331 page: match self.mode {
332 QueryMode::Load(spec) => {
333 if spec.limit.is_some() || spec.offset > 0 {
334 Some(PageSpec {
335 limit: spec.limit,
336 offset: spec.offset,
337 })
338 } else {
339 None
340 }
341 }
342 QueryMode::Delete(_) => None,
343 },
344 consistency: self.consistency,
345 };
346 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
347
348 validate_logical_plan_model(&schema_info, self.model, &plan)?;
349
350 Ok(plan)
351 }
352
353 fn validate_intent(&self) -> Result<(), IntentError> {
355 if self.key_access_conflict {
356 return Err(IntentError::KeyAccessConflict);
357 }
358
359 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
360 .map_err(IntentError::from)?;
361
362 if let Some(state) = &self.key_access {
363 match state.kind {
364 KeyAccessKind::Many if self.predicate.is_some() => {
365 return Err(IntentError::ByIdsWithPredicate);
366 }
367 KeyAccessKind::Only if self.predicate.is_some() => {
368 return Err(IntentError::OnlyWithPredicate);
369 }
370 _ => {
371 }
373 }
374 }
375
376 Ok(())
377 }
378}
379
380#[derive(Debug)]
392pub struct Query<E: EntityKind> {
393 intent: QueryModel<'static, E::Key>,
394 _marker: PhantomData<E>,
395}
396
397impl<E: EntityKind> Query<E> {
398 #[must_use]
402 pub const fn new(consistency: ReadConsistency) -> Self {
403 Self {
404 intent: QueryModel::new(E::MODEL, consistency),
405 _marker: PhantomData,
406 }
407 }
408
409 #[must_use]
411 pub const fn mode(&self) -> QueryMode {
412 self.intent.mode()
413 }
414
415 #[must_use]
416 pub(crate) fn has_explicit_order(&self) -> bool {
417 self.intent.has_explicit_order()
418 }
419
420 #[must_use]
421 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
422 self.intent.load_spec()
423 }
424
425 #[must_use]
427 pub fn filter(mut self, predicate: Predicate) -> Self {
428 self.intent = self.intent.filter(predicate);
429 self
430 }
431
432 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
434 let Self { intent, _marker } = self;
435 let intent = intent.filter_expr(expr)?;
436
437 Ok(Self { intent, _marker })
438 }
439
440 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
442 let Self { intent, _marker } = self;
443 let intent = intent.sort_expr(expr)?;
444
445 Ok(Self { intent, _marker })
446 }
447
448 #[must_use]
450 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
451 self.intent = self.intent.order_by(field);
452 self
453 }
454
455 #[must_use]
457 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
458 self.intent = self.intent.order_by_desc(field);
459 self
460 }
461
462 #[must_use]
464 pub fn distinct(mut self) -> Self {
465 self.intent = self.intent.distinct();
466 self
467 }
468
469 pub(crate) fn by_id(self, id: E::Key) -> Self {
471 let Self { intent, _marker } = self;
472 Self {
473 intent: intent.by_id(id),
474 _marker,
475 }
476 }
477
478 pub(crate) fn by_ids<I>(self, ids: I) -> Self
480 where
481 I: IntoIterator<Item = E::Key>,
482 {
483 let Self { intent, _marker } = self;
484 Self {
485 intent: intent.by_ids(ids),
486 _marker,
487 }
488 }
489
490 #[must_use]
492 pub fn delete(mut self) -> Self {
493 self.intent = self.intent.delete();
494 self
495 }
496
497 #[must_use]
503 pub fn limit(mut self, limit: u32) -> Self {
504 self.intent = self.intent.limit(limit);
505 self
506 }
507
508 #[must_use]
512 pub fn offset(mut self, offset: u32) -> Self {
513 self.intent = self.intent.offset(offset);
514 self
515 }
516
517 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
519 let plan = self.build_plan()?;
520
521 Ok(plan.explain_with_model(E::MODEL))
522 }
523
524 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
526 let plan = self.build_plan()?;
527
528 Ok(ExecutablePlan::new(plan))
529 }
530
531 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
533 let plan_value = self.intent.build_plan_model()?;
534 let (logical, access) = plan_value.into_parts();
535 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
536 let plan = AccessPlannedQuery::from_parts(logical, access);
537
538 Ok(plan)
539 }
540}
541
542impl<E> Query<E>
543where
544 E: EntityKind + SingletonEntity,
545 E::Key: Default,
546{
547 pub(crate) fn only(self) -> Self {
549 let Self { intent, _marker } = self;
550
551 Self {
552 intent: intent.only(E::Key::default()),
553 _marker,
554 }
555 }
556}
557
558#[derive(Debug, ThisError)]
563pub enum QueryError {
564 #[error("{0}")]
565 Validate(#[from] ValidateError),
566
567 #[error("{0}")]
568 Plan(Box<PlanError>),
569
570 #[error("{0}")]
571 Intent(#[from] IntentError),
572
573 #[error("{0}")]
574 Response(#[from] ResponseError),
575
576 #[error("{0}")]
577 Execute(#[from] InternalError),
578}
579
580impl From<PlannerError> for QueryError {
581 fn from(err: PlannerError) -> Self {
582 match err {
583 PlannerError::Plan(err) => Self::from(*err),
584 PlannerError::Internal(err) => Self::Execute(*err),
585 }
586 }
587}
588
589impl From<PlanError> for QueryError {
590 fn from(err: PlanError) -> Self {
591 Self::Plan(Box::new(err))
592 }
593}
594
595#[derive(Clone, Copy, Debug, ThisError)]
600pub enum IntentError {
601 #[error("{0}")]
602 PlanShape(#[from] policy::PlanPolicyError),
603
604 #[error("by_ids() cannot be combined with predicates")]
605 ByIdsWithPredicate,
606
607 #[error("only() cannot be combined with predicates")]
608 OnlyWithPredicate,
609
610 #[error("multiple key access methods were used on the same query")]
611 KeyAccessConflict,
612
613 #[error("cursor pagination requires an explicit ordering")]
614 CursorRequiresOrder,
615
616 #[error("cursor pagination requires an explicit limit")]
617 CursorRequiresLimit,
618
619 #[error("cursor tokens can only be used with .page().execute()")]
620 CursorRequiresPagedExecution,
621}
622
623impl From<policy::CursorPagingPolicyError> for IntentError {
624 fn from(err: policy::CursorPagingPolicyError) -> Self {
625 match err {
626 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
627 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
628 }
629 }
630}
631
632fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
634 match order {
635 Some(mut spec) => {
636 spec.fields.push((field.to_string(), direction));
637 spec
638 }
639 None => OrderSpec {
640 fields: vec![(field.to_string(), direction)],
641 },
642 }
643}
644
645fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
650 let mut order = order?;
651 if order.fields.is_empty() {
652 return Some(order);
653 }
654
655 let pk_field = model.primary_key.name;
656 let mut pk_direction = None;
657 order.fields.retain(|(field, direction)| {
658 if field == pk_field {
659 if pk_direction.is_none() {
660 pk_direction = Some(*direction);
661 }
662 false
663 } else {
664 true
665 }
666 });
667
668 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
669 order.fields.push((pk_field.to_string(), pk_direction));
670
671 Some(order)
672}