1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{Predicate, SchemaInfo, ValidateError, normalize},
21 },
22 response::ResponseError,
23 },
24 error::InternalError,
25 traits::{EntityKind, FieldValue, SingletonEntity},
26 value::Value,
27};
28use std::marker::PhantomData;
29use thiserror::Error as ThisError;
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub enum QueryMode {
40 Load(LoadSpec),
41 Delete(DeleteSpec),
42}
43
44impl QueryMode {
45 #[must_use]
47 pub const fn is_load(&self) -> bool {
48 match self {
49 Self::Load(_) => true,
50 Self::Delete(_) => false,
51 }
52 }
53
54 #[must_use]
56 pub const fn is_delete(&self) -> bool {
57 match self {
58 Self::Delete(_) => true,
59 Self::Load(_) => false,
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
71pub struct LoadSpec {
72 pub limit: Option<u32>,
73 pub offset: u32,
74}
75
76impl LoadSpec {
77 #[must_use]
79 pub const fn new() -> Self {
80 Self {
81 limit: None,
82 offset: 0,
83 }
84 }
85}
86
87#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
94pub struct DeleteSpec {
95 pub limit: Option<u32>,
96}
97
98impl DeleteSpec {
99 #[must_use]
101 pub const fn new() -> Self {
102 Self { limit: None }
103 }
104}
105
106#[derive(Debug)]
114pub(crate) struct QueryModel<'m, K> {
115 model: &'m crate::model::entity::EntityModel,
116 mode: QueryMode,
117 predicate: Option<Predicate>,
118 key_access: Option<KeyAccessState<K>>,
119 key_access_conflict: bool,
120 order: Option<OrderSpec>,
121 consistency: ReadConsistency,
122}
123
124impl<'m, K: FieldValue> QueryModel<'m, K> {
125 #[must_use]
126 pub const fn new(
127 model: &'m crate::model::entity::EntityModel,
128 consistency: ReadConsistency,
129 ) -> Self {
130 Self {
131 model,
132 mode: QueryMode::Load(LoadSpec::new()),
133 predicate: None,
134 key_access: None,
135 key_access_conflict: false,
136 order: None,
137 consistency,
138 }
139 }
140
141 #[must_use]
143 pub const fn mode(&self) -> QueryMode {
144 self.mode
145 }
146
147 #[must_use]
149 pub fn filter(mut self, predicate: Predicate) -> Self {
150 self.predicate = match self.predicate.take() {
151 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
152 None => Some(predicate),
153 };
154 self
155 }
156
157 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
159 let schema = SchemaInfo::from_entity_model(self.model)?;
160 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
161
162 Ok(self.filter(predicate))
163 }
164
165 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
167 let schema = SchemaInfo::from_entity_model(self.model)?;
168 let order = match expr.lower_with(&schema) {
169 Ok(order) => order,
170 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
171 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
172 };
173
174 if order.fields.is_empty() {
175 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
176 }
177
178 Ok(self.order_spec(order))
179 }
180
181 #[must_use]
183 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
184 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
185 self
186 }
187
188 #[must_use]
190 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
191 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
192 self
193 }
194
195 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
197 self.order = Some(order);
198 self
199 }
200
201 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
203 if let Some(existing) = &self.key_access
204 && existing.kind != kind
205 {
206 self.key_access_conflict = true;
207 }
208
209 self.key_access = Some(KeyAccessState { kind, access });
210
211 self
212 }
213
214 pub(crate) fn by_key(self, key: K) -> Self {
216 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
217 }
218
219 pub(crate) fn by_keys<I>(self, keys: I) -> Self
221 where
222 I: IntoIterator<Item = K>,
223 {
224 self.set_key_access(
225 KeyAccessKind::Many,
226 KeyAccess::Many(keys.into_iter().collect()),
227 )
228 }
229
230 pub(crate) fn only(self, id: K) -> Self {
232 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
233 }
234
235 #[must_use]
237 pub const fn delete(mut self) -> Self {
238 if self.mode.is_load() {
239 self.mode = QueryMode::Delete(DeleteSpec::new());
240 }
241 self
242 }
243
244 #[must_use]
248 pub const fn limit(mut self, limit: u32) -> Self {
249 match self.mode {
250 QueryMode::Load(mut spec) => {
251 spec.limit = Some(limit);
252 self.mode = QueryMode::Load(spec);
253 }
254 QueryMode::Delete(mut spec) => {
255 spec.limit = Some(limit);
256 self.mode = QueryMode::Delete(spec);
257 }
258 }
259 self
260 }
261
262 #[must_use]
264 pub const fn offset(mut self, offset: u32) -> Self {
265 if let QueryMode::Load(mut spec) = self.mode {
266 spec.offset = offset;
267 self.mode = QueryMode::Load(spec);
268 }
269 self
270 }
271
272 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
274 let schema_info = SchemaInfo::from_entity_model(self.model)?;
276 self.validate_intent()?;
277
278 let normalized_predicate = self.predicate.as_ref().map(normalize);
280 let access_plan_value = match &self.key_access {
281 Some(state) => access_plan_from_keys_value(&state.access),
282 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
283 };
284
285 let plan = LogicalPlan {
287 mode: self.mode,
288 access: access_plan_value,
289 predicate: normalized_predicate,
290 order: self.order.clone(),
291 delete_limit: match self.mode {
292 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
293 QueryMode::Load(_) => None,
294 },
295 page: match self.mode {
296 QueryMode::Load(spec) => {
297 if spec.limit.is_some() || spec.offset > 0 {
298 Some(PageSpec {
299 limit: spec.limit,
300 offset: spec.offset,
301 })
302 } else {
303 None
304 }
305 }
306 QueryMode::Delete(_) => None,
307 },
308 consistency: self.consistency,
309 };
310
311 validate_logical_plan_model(&schema_info, self.model, &plan)?;
312
313 Ok(plan)
314 }
315
316 const fn validate_intent(&self) -> Result<(), IntentError> {
318 if self.key_access_conflict {
319 return Err(IntentError::KeyAccessConflict);
320 }
321
322 if let Some(order) = &self.order
323 && order.fields.is_empty()
324 {
325 return Err(IntentError::EmptyOrderSpec);
326 }
327
328 if let Some(state) = &self.key_access {
329 match state.kind {
330 KeyAccessKind::Many if self.predicate.is_some() => {
331 return Err(IntentError::ManyWithPredicate);
332 }
333 KeyAccessKind::Only if self.predicate.is_some() => {
334 return Err(IntentError::OnlyWithPredicate);
335 }
336 _ => {}
337 }
338 }
339
340 match self.mode {
341 QueryMode::Load(_) => {}
342 QueryMode::Delete(spec) => {
343 if spec.limit.is_some() && self.order.is_none() {
344 return Err(IntentError::DeleteLimitRequiresOrder);
345 }
346 }
347 }
348
349 Ok(())
350 }
351}
352
353#[derive(Debug)]
365pub struct Query<E: EntityKind> {
366 intent: QueryModel<'static, E::Id>,
367 #[allow(clippy::struct_field_names)]
368 _marker: PhantomData<E>,
369}
370
371impl<E: EntityKind> Query<E> {
372 #[must_use]
376 pub const fn new(consistency: ReadConsistency) -> Self {
377 Self {
378 intent: QueryModel::new(E::MODEL, consistency),
379 _marker: PhantomData,
380 }
381 }
382
383 #[must_use]
385 pub const fn mode(&self) -> QueryMode {
386 self.intent.mode()
387 }
388
389 #[must_use]
391 pub fn filter(mut self, predicate: Predicate) -> Self {
392 self.intent = self.intent.filter(predicate);
393 self
394 }
395
396 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
398 let Self { intent, _marker } = self;
399 let intent = intent.filter_expr(expr)?;
400
401 Ok(Self { intent, _marker })
402 }
403
404 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
406 let Self { intent, _marker } = self;
407 let intent = intent.sort_expr(expr)?;
408
409 Ok(Self { intent, _marker })
410 }
411
412 #[must_use]
414 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
415 self.intent = self.intent.order_by(field);
416 self
417 }
418
419 #[must_use]
421 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
422 self.intent = self.intent.order_by_desc(field);
423 self
424 }
425
426 pub(crate) fn by_key(self, key: E::Id) -> Self {
428 let Self { intent, _marker } = self;
429 Self {
430 intent: intent.by_key(key),
431 _marker,
432 }
433 }
434
435 pub(crate) fn by_keys<I>(self, keys: I) -> Self
437 where
438 I: IntoIterator<Item = E::Id>,
439 {
440 let Self { intent, _marker } = self;
441 Self {
442 intent: intent.by_keys(keys),
443 _marker,
444 }
445 }
446
447 #[must_use]
449 pub fn delete(mut self) -> Self {
450 self.intent = self.intent.delete();
451 self
452 }
453
454 #[must_use]
458 pub fn limit(mut self, limit: u32) -> Self {
459 self.intent = self.intent.limit(limit);
460 self
461 }
462
463 #[must_use]
465 pub fn offset(mut self, offset: u32) -> Self {
466 self.intent = self.intent.offset(offset);
467 self
468 }
469
470 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
472 let plan = self.build_plan()?;
473
474 Ok(plan.explain())
475 }
476
477 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
479 let plan = self.build_plan()?;
480
481 Ok(ExecutablePlan::new(plan))
482 }
483
484 fn build_plan(&self) -> Result<LogicalPlan<E::Id>, QueryError> {
486 let plan_value = self.intent.build_plan_model()?;
487 let LogicalPlan {
488 mode,
489 access,
490 predicate,
491 order,
492 delete_limit,
493 page,
494 consistency,
495 } = plan_value;
496
497 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
498 let plan = LogicalPlan {
499 mode,
500 access,
501 predicate,
502 order,
503 delete_limit,
504 page,
505 consistency,
506 };
507
508 Ok(plan)
509 }
510}
511
512impl<E> Query<E>
513where
514 E: EntityKind + SingletonEntity,
515{
516 pub(crate) fn only(self, id: E::Id) -> Self {
518 let Self { intent, _marker } = self;
519
520 Self {
521 intent: intent.only(id),
522 _marker,
523 }
524 }
525}
526
527#[derive(Debug, ThisError)]
532pub enum QueryError {
533 #[error("{0}")]
534 Validate(#[from] ValidateError),
535
536 #[error("{0}")]
537 Plan(#[from] PlanError),
538
539 #[error("{0}")]
540 Intent(#[from] IntentError),
541
542 #[error("{0}")]
543 Response(#[from] ResponseError),
544
545 #[error("{0}")]
546 Execute(#[from] InternalError),
547}
548
549impl From<PlannerError> for QueryError {
550 fn from(err: PlannerError) -> Self {
551 match err {
552 PlannerError::Plan(err) => Self::Plan(err),
553 PlannerError::Internal(err) => Self::Execute(err),
554 }
555 }
556}
557
558#[derive(Clone, Copy, Debug, ThisError)]
563pub enum IntentError {
564 #[error("delete limit requires an explicit ordering")]
565 DeleteLimitRequiresOrder,
566
567 #[error("order specification must include at least one field")]
568 EmptyOrderSpec,
569
570 #[error("many() cannot be combined with predicates")]
571 ManyWithPredicate,
572
573 #[error("only() cannot be combined with predicates")]
574 OnlyWithPredicate,
575
576 #[error("multiple key access methods were used on the same query")]
577 KeyAccessConflict,
578}
579
580fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
582 match order {
583 Some(mut spec) => {
584 spec.fields.push((field.to_string(), direction));
585 spec
586 }
587 None => OrderSpec {
588 fields: vec![(field.to_string(), direction)],
589 },
590 }
591}