1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{Predicate, SchemaInfo, ValidateError, normalize},
21 },
22 response::ResponseError,
23 },
24 error::InternalError,
25 traits::{EntityKind, FieldValue, SingletonEntity},
26 types::Ref,
27 value::Value,
28};
29use std::marker::PhantomData;
30use thiserror::Error as ThisError;
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40pub enum QueryMode {
41 Load(LoadSpec),
42 Delete(DeleteSpec),
43}
44
45impl QueryMode {
46 #[must_use]
48 pub const fn is_load(&self) -> bool {
49 match self {
50 Self::Load(_) => true,
51 Self::Delete(_) => false,
52 }
53 }
54
55 #[must_use]
57 pub const fn is_delete(&self) -> bool {
58 match self {
59 Self::Delete(_) => true,
60 Self::Load(_) => false,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
72pub struct LoadSpec {
73 pub limit: Option<u32>,
74 pub offset: u32,
75}
76
77impl LoadSpec {
78 #[must_use]
80 pub const fn new() -> Self {
81 Self {
82 limit: None,
83 offset: 0,
84 }
85 }
86}
87
88#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
95pub struct DeleteSpec {
96 pub limit: Option<u32>,
97}
98
99impl DeleteSpec {
100 #[must_use]
102 pub const fn new() -> Self {
103 Self { limit: None }
104 }
105}
106
107#[derive(Debug)]
115pub(crate) struct QueryModel<'m, K> {
116 model: &'m crate::model::entity::EntityModel,
117 mode: QueryMode,
118 predicate: Option<Predicate>,
119 key_access: Option<KeyAccessState<K>>,
120 key_access_conflict: bool,
121 order: Option<OrderSpec>,
122 consistency: ReadConsistency,
123}
124
125impl<'m, K: FieldValue> QueryModel<'m, K> {
126 #[must_use]
127 pub const fn new(
128 model: &'m crate::model::entity::EntityModel,
129 consistency: ReadConsistency,
130 ) -> Self {
131 Self {
132 model,
133 mode: QueryMode::Load(LoadSpec::new()),
134 predicate: None,
135 key_access: None,
136 key_access_conflict: false,
137 order: None,
138 consistency,
139 }
140 }
141
142 #[must_use]
144 pub const fn mode(&self) -> QueryMode {
145 self.mode
146 }
147
148 #[must_use]
150 pub fn filter(mut self, predicate: Predicate) -> Self {
151 self.predicate = match self.predicate.take() {
152 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
153 None => Some(predicate),
154 };
155 self
156 }
157
158 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
160 let schema = SchemaInfo::from_entity_model(self.model)?;
161 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
162
163 Ok(self.filter(predicate))
164 }
165
166 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
168 let schema = SchemaInfo::from_entity_model(self.model)?;
169 let order = match expr.lower_with(&schema) {
170 Ok(order) => order,
171 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
172 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
173 };
174
175 if order.fields.is_empty() {
176 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
177 }
178
179 Ok(self.order_spec(order))
180 }
181
182 #[must_use]
184 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
185 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
186 self
187 }
188
189 #[must_use]
191 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
192 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
193 self
194 }
195
196 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
198 self.order = Some(order);
199 self
200 }
201
202 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
204 if let Some(existing) = &self.key_access
205 && existing.kind != kind
206 {
207 self.key_access_conflict = true;
208 }
209
210 self.key_access = Some(KeyAccessState { kind, access });
211
212 self
213 }
214
215 pub(crate) fn by_key(self, key: K) -> Self {
217 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
218 }
219
220 pub(crate) fn by_keys<I>(self, keys: I) -> Self
222 where
223 I: IntoIterator<Item = K>,
224 {
225 self.set_key_access(
226 KeyAccessKind::Many,
227 KeyAccess::Many(keys.into_iter().collect()),
228 )
229 }
230
231 pub(crate) fn only(self, id: K) -> Self {
233 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
234 }
235
236 #[must_use]
238 pub const fn delete(mut self) -> Self {
239 if self.mode.is_load() {
240 self.mode = QueryMode::Delete(DeleteSpec::new());
241 }
242 self
243 }
244
245 #[must_use]
249 pub const fn limit(mut self, limit: u32) -> Self {
250 match self.mode {
251 QueryMode::Load(mut spec) => {
252 spec.limit = Some(limit);
253 self.mode = QueryMode::Load(spec);
254 }
255 QueryMode::Delete(mut spec) => {
256 spec.limit = Some(limit);
257 self.mode = QueryMode::Delete(spec);
258 }
259 }
260 self
261 }
262
263 #[must_use]
265 pub const fn offset(mut self, offset: u32) -> Self {
266 if let QueryMode::Load(mut spec) = self.mode {
267 spec.offset = offset;
268 self.mode = QueryMode::Load(spec);
269 }
270 self
271 }
272
273 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
275 let schema_info = SchemaInfo::from_entity_model(self.model)?;
277 self.validate_intent()?;
278
279 let normalized_predicate = self.predicate.as_ref().map(normalize);
281 let access_plan_value = match &self.key_access {
282 Some(state) => access_plan_from_keys_value(&state.access),
283 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
284 };
285
286 let plan = LogicalPlan {
288 mode: self.mode,
289 access: access_plan_value,
290 predicate: normalized_predicate,
291 order: self.order.clone(),
292 delete_limit: match self.mode {
293 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
294 QueryMode::Load(_) => None,
295 },
296 page: match self.mode {
297 QueryMode::Load(spec) => {
298 if spec.limit.is_some() || spec.offset > 0 {
299 Some(PageSpec {
300 limit: spec.limit,
301 offset: spec.offset,
302 })
303 } else {
304 None
305 }
306 }
307 QueryMode::Delete(_) => None,
308 },
309 consistency: self.consistency,
310 };
311
312 validate_logical_plan_model(&schema_info, self.model, &plan)?;
313
314 Ok(plan)
315 }
316
317 const fn validate_intent(&self) -> Result<(), IntentError> {
319 if self.key_access_conflict {
320 return Err(IntentError::KeyAccessConflict);
321 }
322
323 if let Some(order) = &self.order
324 && order.fields.is_empty()
325 {
326 return Err(IntentError::EmptyOrderSpec);
327 }
328
329 if let Some(state) = &self.key_access {
330 match state.kind {
331 KeyAccessKind::Many if self.predicate.is_some() => {
332 return Err(IntentError::ManyWithPredicate);
333 }
334 KeyAccessKind::Only if self.predicate.is_some() => {
335 return Err(IntentError::OnlyWithPredicate);
336 }
337 _ => {
338 }
340 }
341 }
342
343 match self.mode {
344 QueryMode::Load(_) => {}
345 QueryMode::Delete(spec) => {
346 if spec.limit.is_some() && self.order.is_none() {
347 return Err(IntentError::DeleteLimitRequiresOrder);
348 }
349 }
350 }
351
352 Ok(())
353 }
354}
355
356#[derive(Debug)]
368pub struct Query<E: EntityKind> {
369 intent: QueryModel<'static, E::Id>,
370 #[allow(clippy::struct_field_names)]
371 _marker: PhantomData<E>,
372}
373
374impl<E: EntityKind> Query<E> {
375 #[must_use]
379 pub const fn new(consistency: ReadConsistency) -> Self {
380 Self {
381 intent: QueryModel::new(E::MODEL, consistency),
382 _marker: PhantomData,
383 }
384 }
385
386 #[must_use]
388 pub const fn mode(&self) -> QueryMode {
389 self.intent.mode()
390 }
391
392 #[must_use]
394 pub fn filter(mut self, predicate: Predicate) -> Self {
395 self.intent = self.intent.filter(predicate);
396 self
397 }
398
399 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
401 let Self { intent, _marker } = self;
402 let intent = intent.filter_expr(expr)?;
403
404 Ok(Self { intent, _marker })
405 }
406
407 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
409 let Self { intent, _marker } = self;
410 let intent = intent.sort_expr(expr)?;
411
412 Ok(Self { intent, _marker })
413 }
414
415 #[must_use]
417 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
418 self.intent = self.intent.order_by(field);
419 self
420 }
421
422 #[must_use]
424 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
425 self.intent = self.intent.order_by_desc(field);
426 self
427 }
428
429 pub(crate) fn by_key(self, key: E::Id) -> Self {
431 let Self { intent, _marker } = self;
432 Self {
433 intent: intent.by_key(key),
434 _marker,
435 }
436 }
437
438 pub(crate) fn by_ref(self, reference: Ref<E>) -> Self {
440 self.by_key(reference.key())
441 }
442
443 pub(crate) fn by_keys<I>(self, keys: I) -> Self
445 where
446 I: IntoIterator<Item = E::Id>,
447 {
448 let Self { intent, _marker } = self;
449 Self {
450 intent: intent.by_keys(keys),
451 _marker,
452 }
453 }
454
455 #[must_use]
457 pub fn delete(mut self) -> Self {
458 self.intent = self.intent.delete();
459 self
460 }
461
462 #[must_use]
466 pub fn limit(mut self, limit: u32) -> Self {
467 self.intent = self.intent.limit(limit);
468 self
469 }
470
471 #[must_use]
473 pub fn offset(mut self, offset: u32) -> Self {
474 self.intent = self.intent.offset(offset);
475 self
476 }
477
478 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
480 let plan = self.build_plan()?;
481
482 Ok(plan.explain())
483 }
484
485 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
487 let plan = self.build_plan()?;
488
489 Ok(ExecutablePlan::new(plan))
490 }
491
492 fn build_plan(&self) -> Result<LogicalPlan<E::Id>, QueryError> {
494 let plan_value = self.intent.build_plan_model()?;
495 let LogicalPlan {
496 mode,
497 access,
498 predicate,
499 order,
500 delete_limit,
501 page,
502 consistency,
503 } = plan_value;
504
505 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
506 let plan = LogicalPlan {
507 mode,
508 access,
509 predicate,
510 order,
511 delete_limit,
512 page,
513 consistency,
514 };
515
516 Ok(plan)
517 }
518}
519
520impl<E> Query<E>
521where
522 E: EntityKind + SingletonEntity,
523 E::Id: Default,
524{
525 pub(crate) fn only(self) -> Self {
527 let Self { intent, _marker } = self;
528
529 Self {
530 intent: intent.only(E::Id::default()),
531 _marker,
532 }
533 }
534}
535
536#[derive(Debug, ThisError)]
541pub enum QueryError {
542 #[error("{0}")]
543 Validate(#[from] ValidateError),
544
545 #[error("{0}")]
546 Plan(#[from] PlanError),
547
548 #[error("{0}")]
549 Intent(#[from] IntentError),
550
551 #[error("{0}")]
552 Response(#[from] ResponseError),
553
554 #[error("{0}")]
555 Execute(#[from] InternalError),
556}
557
558impl From<PlannerError> for QueryError {
559 fn from(err: PlannerError) -> Self {
560 match err {
561 PlannerError::Plan(err) => Self::Plan(err),
562 PlannerError::Internal(err) => Self::Execute(err),
563 }
564 }
565}
566
567#[derive(Clone, Copy, Debug, ThisError)]
572pub enum IntentError {
573 #[error("delete limit requires an explicit ordering")]
574 DeleteLimitRequiresOrder,
575
576 #[error("order specification must include at least one field")]
577 EmptyOrderSpec,
578
579 #[error("many() cannot be combined with predicates")]
580 ManyWithPredicate,
581
582 #[error("only() cannot be combined with predicates")]
583 OnlyWithPredicate,
584
585 #[error("multiple key access methods were used on the same query")]
586 KeyAccessConflict,
587}
588
589fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
591 match order {
592 Some(mut spec) => {
593 spec.fields.push((field.to_string(), direction));
594 spec
595 }
596 None => OrderSpec {
597 fields: vec![(field.to_string(), direction)],
598 },
599 }
600}