1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{Predicate, SchemaInfo, ValidateError, normalize},
21 },
22 response::ResponseError,
23 },
24 error::InternalError,
25 traits::{EntityKind, FieldValue, SingletonEntity},
26 value::Value,
27};
28use std::marker::PhantomData;
29use thiserror::Error as ThisError;
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub enum QueryMode {
40 Load(LoadSpec),
41 Delete(DeleteSpec),
42}
43
44impl QueryMode {
45 #[must_use]
47 pub const fn is_load(&self) -> bool {
48 match self {
49 Self::Load(_) => true,
50 Self::Delete(_) => false,
51 }
52 }
53
54 #[must_use]
56 pub const fn is_delete(&self) -> bool {
57 match self {
58 Self::Delete(_) => true,
59 Self::Load(_) => false,
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
71pub struct LoadSpec {
72 pub limit: Option<u32>,
73 pub offset: u32,
74}
75
76impl LoadSpec {
77 #[must_use]
79 pub const fn new() -> Self {
80 Self {
81 limit: None,
82 offset: 0,
83 }
84 }
85}
86
87#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
94pub struct DeleteSpec {
95 pub limit: Option<u32>,
96}
97
98impl DeleteSpec {
99 #[must_use]
101 pub const fn new() -> Self {
102 Self { limit: None }
103 }
104}
105
106#[derive(Debug)]
114pub(crate) struct QueryModel<'m, K> {
115 model: &'m crate::model::entity::EntityModel,
116 mode: QueryMode,
117 predicate: Option<Predicate>,
118 key_access: Option<KeyAccessState<K>>,
119 key_access_conflict: bool,
120 order: Option<OrderSpec>,
121 consistency: ReadConsistency,
122}
123
124impl<'m, K: FieldValue> QueryModel<'m, K> {
125 #[must_use]
126 pub const fn new(
127 model: &'m crate::model::entity::EntityModel,
128 consistency: ReadConsistency,
129 ) -> Self {
130 Self {
131 model,
132 mode: QueryMode::Load(LoadSpec::new()),
133 predicate: None,
134 key_access: None,
135 key_access_conflict: false,
136 order: None,
137 consistency,
138 }
139 }
140
141 #[must_use]
143 pub const fn mode(&self) -> QueryMode {
144 self.mode
145 }
146
147 #[must_use]
149 pub fn filter(mut self, predicate: Predicate) -> Self {
150 self.predicate = match self.predicate.take() {
151 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
152 None => Some(predicate),
153 };
154 self
155 }
156
157 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
159 let schema = SchemaInfo::from_entity_model(self.model)?;
160 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
161
162 Ok(self.filter(predicate))
163 }
164
165 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
167 let schema = SchemaInfo::from_entity_model(self.model)?;
168 let order = match expr.lower_with(&schema) {
169 Ok(order) => order,
170 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
171 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
172 };
173
174 if order.fields.is_empty() {
175 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
176 }
177
178 Ok(self.order_spec(order))
179 }
180
181 #[must_use]
183 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
184 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
185 self
186 }
187
188 #[must_use]
190 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
191 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
192 self
193 }
194
195 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
197 self.order = Some(order);
198 self
199 }
200
201 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
203 if let Some(existing) = &self.key_access
204 && existing.kind != kind
205 {
206 self.key_access_conflict = true;
207 }
208
209 self.key_access = Some(KeyAccessState { kind, access });
210
211 self
212 }
213
214 pub(crate) fn by_id(self, id: K) -> Self {
216 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
217 }
218
219 pub(crate) fn by_ids<I>(self, ids: I) -> Self
221 where
222 I: IntoIterator<Item = K>,
223 {
224 self.set_key_access(
225 KeyAccessKind::Many,
226 KeyAccess::Many(ids.into_iter().collect()),
227 )
228 }
229
230 pub(crate) fn only(self, id: K) -> Self {
232 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
233 }
234
235 #[must_use]
237 pub const fn delete(mut self) -> Self {
238 if self.mode.is_load() {
239 self.mode = QueryMode::Delete(DeleteSpec::new());
240 }
241 self
242 }
243
244 #[must_use]
248 pub const fn limit(mut self, limit: u32) -> Self {
249 match self.mode {
250 QueryMode::Load(mut spec) => {
251 spec.limit = Some(limit);
252 self.mode = QueryMode::Load(spec);
253 }
254 QueryMode::Delete(mut spec) => {
255 spec.limit = Some(limit);
256 self.mode = QueryMode::Delete(spec);
257 }
258 }
259 self
260 }
261
262 #[must_use]
264 pub const fn offset(mut self, offset: u32) -> Self {
265 if let QueryMode::Load(mut spec) = self.mode {
266 spec.offset = offset;
267 self.mode = QueryMode::Load(spec);
268 }
269 self
270 }
271
272 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
274 let schema_info = SchemaInfo::from_entity_model(self.model)?;
276 self.validate_intent()?;
277
278 let normalized_predicate = self.predicate.as_ref().map(normalize);
280 let access_plan_value = match &self.key_access {
281 Some(state) => access_plan_from_keys_value(&state.access),
282 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
283 };
284
285 let plan = LogicalPlan {
287 mode: self.mode,
288 access: access_plan_value,
289 predicate: normalized_predicate,
290 order: self.order.clone(),
291 delete_limit: match self.mode {
292 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
293 QueryMode::Load(_) => None,
294 },
295 page: match self.mode {
296 QueryMode::Load(spec) => {
297 if spec.limit.is_some() || spec.offset > 0 {
298 Some(PageSpec {
299 limit: spec.limit,
300 offset: spec.offset,
301 })
302 } else {
303 None
304 }
305 }
306 QueryMode::Delete(_) => None,
307 },
308 consistency: self.consistency,
309 };
310
311 validate_logical_plan_model(&schema_info, self.model, &plan)?;
312
313 Ok(plan)
314 }
315
316 const fn validate_intent(&self) -> Result<(), IntentError> {
318 if self.key_access_conflict {
319 return Err(IntentError::KeyAccessConflict);
320 }
321
322 if let Some(order) = &self.order
323 && order.fields.is_empty()
324 {
325 return Err(IntentError::EmptyOrderSpec);
326 }
327
328 if let Some(state) = &self.key_access {
329 match state.kind {
330 KeyAccessKind::Many if self.predicate.is_some() => {
331 return Err(IntentError::ByIdsWithPredicate);
332 }
333 KeyAccessKind::Only if self.predicate.is_some() => {
334 return Err(IntentError::OnlyWithPredicate);
335 }
336 _ => {
337 }
339 }
340 }
341
342 match self.mode {
343 QueryMode::Load(_) => {}
344 QueryMode::Delete(spec) => {
345 if spec.limit.is_some() && self.order.is_none() {
346 return Err(IntentError::DeleteLimitRequiresOrder);
347 }
348 }
349 }
350
351 Ok(())
352 }
353}
354
355#[derive(Debug)]
367pub struct Query<E: EntityKind> {
368 intent: QueryModel<'static, E::Key>,
369 #[allow(clippy::struct_field_names)]
370 _marker: PhantomData<E>,
371}
372
373impl<E: EntityKind> Query<E> {
374 #[must_use]
378 pub const fn new(consistency: ReadConsistency) -> Self {
379 Self {
380 intent: QueryModel::new(E::MODEL, consistency),
381 _marker: PhantomData,
382 }
383 }
384
385 #[must_use]
387 pub const fn mode(&self) -> QueryMode {
388 self.intent.mode()
389 }
390
391 #[must_use]
393 pub fn filter(mut self, predicate: Predicate) -> Self {
394 self.intent = self.intent.filter(predicate);
395 self
396 }
397
398 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
400 let Self { intent, _marker } = self;
401 let intent = intent.filter_expr(expr)?;
402
403 Ok(Self { intent, _marker })
404 }
405
406 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
408 let Self { intent, _marker } = self;
409 let intent = intent.sort_expr(expr)?;
410
411 Ok(Self { intent, _marker })
412 }
413
414 #[must_use]
416 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
417 self.intent = self.intent.order_by(field);
418 self
419 }
420
421 #[must_use]
423 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
424 self.intent = self.intent.order_by_desc(field);
425 self
426 }
427
428 pub(crate) fn by_id(self, id: E::Key) -> Self {
430 let Self { intent, _marker } = self;
431 Self {
432 intent: intent.by_id(id),
433 _marker,
434 }
435 }
436
437 pub(crate) fn by_ids<I>(self, ids: I) -> Self
439 where
440 I: IntoIterator<Item = E::Key>,
441 {
442 let Self { intent, _marker } = self;
443 Self {
444 intent: intent.by_ids(ids),
445 _marker,
446 }
447 }
448
449 #[must_use]
451 pub fn delete(mut self) -> Self {
452 self.intent = self.intent.delete();
453 self
454 }
455
456 #[must_use]
460 pub fn limit(mut self, limit: u32) -> Self {
461 self.intent = self.intent.limit(limit);
462 self
463 }
464
465 #[must_use]
467 pub fn offset(mut self, offset: u32) -> Self {
468 self.intent = self.intent.offset(offset);
469 self
470 }
471
472 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
474 let plan = self.build_plan()?;
475
476 Ok(plan.explain())
477 }
478
479 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
481 let plan = self.build_plan()?;
482
483 Ok(ExecutablePlan::new(plan))
484 }
485
486 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
488 let plan_value = self.intent.build_plan_model()?;
489 let LogicalPlan {
490 mode,
491 access,
492 predicate,
493 order,
494 delete_limit,
495 page,
496 consistency,
497 } = plan_value;
498
499 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
500 let plan = LogicalPlan {
501 mode,
502 access,
503 predicate,
504 order,
505 delete_limit,
506 page,
507 consistency,
508 };
509
510 Ok(plan)
511 }
512}
513
514impl<E> Query<E>
515where
516 E: EntityKind + SingletonEntity,
517 E::Key: Default,
518{
519 pub(crate) fn only(self) -> Self {
521 let Self { intent, _marker } = self;
522
523 Self {
524 intent: intent.only(E::Key::default()),
525 _marker,
526 }
527 }
528}
529
530#[derive(Debug, ThisError)]
535pub enum QueryError {
536 #[error("{0}")]
537 Validate(#[from] ValidateError),
538
539 #[error("{0}")]
540 Plan(#[from] PlanError),
541
542 #[error("{0}")]
543 Intent(#[from] IntentError),
544
545 #[error("{0}")]
546 Response(#[from] ResponseError),
547
548 #[error("{0}")]
549 Execute(#[from] InternalError),
550}
551
552impl From<PlannerError> for QueryError {
553 fn from(err: PlannerError) -> Self {
554 match err {
555 PlannerError::Plan(err) => Self::Plan(err),
556 PlannerError::Internal(err) => Self::Execute(err),
557 }
558 }
559}
560
561#[derive(Clone, Copy, Debug, ThisError)]
566pub enum IntentError {
567 #[error("delete limit requires an explicit ordering")]
568 DeleteLimitRequiresOrder,
569
570 #[error("order specification must include at least one field")]
571 EmptyOrderSpec,
572
573 #[error("by_ids() cannot be combined with predicates")]
574 ByIdsWithPredicate,
575
576 #[error("only() cannot be combined with predicates")]
577 OnlyWithPredicate,
578
579 #[error("multiple key access methods were used on the same query")]
580 KeyAccessConflict,
581}
582
583fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
585 match order {
586 Some(mut spec) => {
587 spec.fields.push((field.to_string(), direction));
588 spec
589 }
590 None => OrderSpec {
591 fields: vec![(field.to_string(), direction)],
592 },
593 }
594}