1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{Predicate, SchemaInfo, ValidateError, normalize},
21 },
22 response::ResponseError,
23 },
24 error::InternalError,
25 traits::{EntityKind, FieldValue, SingletonEntity},
26 types::Ref,
27 value::Value,
28};
29use std::marker::PhantomData;
30use thiserror::Error as ThisError;
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40pub enum QueryMode {
41 Load(LoadSpec),
42 Delete(DeleteSpec),
43}
44
45impl QueryMode {
46 #[must_use]
48 pub const fn is_load(&self) -> bool {
49 match self {
50 Self::Load(_) => true,
51 Self::Delete(_) => false,
52 }
53 }
54
55 #[must_use]
57 pub const fn is_delete(&self) -> bool {
58 match self {
59 Self::Delete(_) => true,
60 Self::Load(_) => false,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
72pub struct LoadSpec {
73 pub limit: Option<u32>,
74 pub offset: u32,
75}
76
77impl LoadSpec {
78 #[must_use]
80 pub const fn new() -> Self {
81 Self {
82 limit: None,
83 offset: 0,
84 }
85 }
86}
87
88#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
95pub struct DeleteSpec {
96 pub limit: Option<u32>,
97}
98
99impl DeleteSpec {
100 #[must_use]
102 pub const fn new() -> Self {
103 Self { limit: None }
104 }
105}
106
107#[derive(Debug)]
115pub(crate) struct QueryModel<'m, K> {
116 model: &'m crate::model::entity::EntityModel,
117 mode: QueryMode,
118 predicate: Option<Predicate>,
119 key_access: Option<KeyAccessState<K>>,
120 key_access_conflict: bool,
121 order: Option<OrderSpec>,
122 consistency: ReadConsistency,
123}
124
125impl<'m, K: FieldValue> QueryModel<'m, K> {
126 #[must_use]
127 pub const fn new(
128 model: &'m crate::model::entity::EntityModel,
129 consistency: ReadConsistency,
130 ) -> Self {
131 Self {
132 model,
133 mode: QueryMode::Load(LoadSpec::new()),
134 predicate: None,
135 key_access: None,
136 key_access_conflict: false,
137 order: None,
138 consistency,
139 }
140 }
141
142 #[must_use]
144 pub const fn mode(&self) -> QueryMode {
145 self.mode
146 }
147
148 #[must_use]
150 pub fn filter(mut self, predicate: Predicate) -> Self {
151 self.predicate = match self.predicate.take() {
152 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
153 None => Some(predicate),
154 };
155 self
156 }
157
158 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
160 let schema = SchemaInfo::from_entity_model(self.model)?;
161 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
162
163 Ok(self.filter(predicate))
164 }
165
166 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
168 let schema = SchemaInfo::from_entity_model(self.model)?;
169 let order = match expr.lower_with(&schema) {
170 Ok(order) => order,
171 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
172 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
173 };
174
175 if order.fields.is_empty() {
176 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
177 }
178
179 Ok(self.order_spec(order))
180 }
181
182 #[must_use]
184 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
185 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
186 self
187 }
188
189 #[must_use]
191 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
192 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
193 self
194 }
195
196 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
198 self.order = Some(order);
199 self
200 }
201
202 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
204 if let Some(existing) = &self.key_access
205 && existing.kind != kind
206 {
207 self.key_access_conflict = true;
208 }
209
210 self.key_access = Some(KeyAccessState { kind, access });
211
212 self
213 }
214
215 pub(crate) fn by_key(self, key: K) -> Self {
217 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
218 }
219
220 pub(crate) fn by_keys<I>(self, keys: I) -> Self
222 where
223 I: IntoIterator<Item = K>,
224 {
225 self.set_key_access(
226 KeyAccessKind::Many,
227 KeyAccess::Many(keys.into_iter().collect()),
228 )
229 }
230
231 pub(crate) fn only(self, id: K) -> Self {
233 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
234 }
235
236 #[must_use]
238 pub const fn delete(mut self) -> Self {
239 if self.mode.is_load() {
240 self.mode = QueryMode::Delete(DeleteSpec::new());
241 }
242 self
243 }
244
245 #[must_use]
249 pub const fn limit(mut self, limit: u32) -> Self {
250 match self.mode {
251 QueryMode::Load(mut spec) => {
252 spec.limit = Some(limit);
253 self.mode = QueryMode::Load(spec);
254 }
255 QueryMode::Delete(mut spec) => {
256 spec.limit = Some(limit);
257 self.mode = QueryMode::Delete(spec);
258 }
259 }
260 self
261 }
262
263 #[must_use]
265 pub const fn offset(mut self, offset: u32) -> Self {
266 if let QueryMode::Load(mut spec) = self.mode {
267 spec.offset = offset;
268 self.mode = QueryMode::Load(spec);
269 }
270 self
271 }
272
273 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
275 let schema_info = SchemaInfo::from_entity_model(self.model)?;
277 self.validate_intent()?;
278
279 let normalized_predicate = self.predicate.as_ref().map(normalize);
281 let access_plan_value = match &self.key_access {
282 Some(state) => access_plan_from_keys_value(&state.access),
283 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
284 };
285
286 let plan = LogicalPlan {
288 mode: self.mode,
289 access: access_plan_value,
290 predicate: normalized_predicate,
291 order: self.order.clone(),
292 delete_limit: match self.mode {
293 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
294 QueryMode::Load(_) => None,
295 },
296 page: match self.mode {
297 QueryMode::Load(spec) => {
298 if spec.limit.is_some() || spec.offset > 0 {
299 Some(PageSpec {
300 limit: spec.limit,
301 offset: spec.offset,
302 })
303 } else {
304 None
305 }
306 }
307 QueryMode::Delete(_) => None,
308 },
309 consistency: self.consistency,
310 };
311
312 validate_logical_plan_model(&schema_info, self.model, &plan)?;
313
314 Ok(plan)
315 }
316
317 const fn validate_intent(&self) -> Result<(), IntentError> {
319 if self.key_access_conflict {
320 return Err(IntentError::KeyAccessConflict);
321 }
322
323 if let Some(order) = &self.order
324 && order.fields.is_empty()
325 {
326 return Err(IntentError::EmptyOrderSpec);
327 }
328
329 if let Some(state) = &self.key_access {
330 match state.kind {
331 KeyAccessKind::Many if self.predicate.is_some() => {
332 return Err(IntentError::ManyWithPredicate);
333 }
334 KeyAccessKind::Only if self.predicate.is_some() => {
335 return Err(IntentError::OnlyWithPredicate);
336 }
337 _ => {}
338 }
339 }
340
341 match self.mode {
342 QueryMode::Load(_) => {}
343 QueryMode::Delete(spec) => {
344 if spec.limit.is_some() && self.order.is_none() {
345 return Err(IntentError::DeleteLimitRequiresOrder);
346 }
347 }
348 }
349
350 Ok(())
351 }
352}
353
354#[derive(Debug)]
366pub struct Query<E: EntityKind> {
367 intent: QueryModel<'static, E::Id>,
368 #[allow(clippy::struct_field_names)]
369 _marker: PhantomData<E>,
370}
371
372impl<E: EntityKind> Query<E> {
373 #[must_use]
377 pub const fn new(consistency: ReadConsistency) -> Self {
378 Self {
379 intent: QueryModel::new(E::MODEL, consistency),
380 _marker: PhantomData,
381 }
382 }
383
384 #[must_use]
386 pub const fn mode(&self) -> QueryMode {
387 self.intent.mode()
388 }
389
390 #[must_use]
392 pub fn filter(mut self, predicate: Predicate) -> Self {
393 self.intent = self.intent.filter(predicate);
394 self
395 }
396
397 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
399 let Self { intent, _marker } = self;
400 let intent = intent.filter_expr(expr)?;
401
402 Ok(Self { intent, _marker })
403 }
404
405 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
407 let Self { intent, _marker } = self;
408 let intent = intent.sort_expr(expr)?;
409
410 Ok(Self { intent, _marker })
411 }
412
413 #[must_use]
415 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
416 self.intent = self.intent.order_by(field);
417 self
418 }
419
420 #[must_use]
422 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
423 self.intent = self.intent.order_by_desc(field);
424 self
425 }
426
427 pub(crate) fn by_key(self, key: E::Id) -> Self {
429 let Self { intent, _marker } = self;
430 Self {
431 intent: intent.by_key(key),
432 _marker,
433 }
434 }
435
436 pub(crate) fn by_ref(self, reference: Ref<E>) -> Self {
438 self.by_key(reference.key())
439 }
440
441 pub(crate) fn by_keys<I>(self, keys: I) -> Self
443 where
444 I: IntoIterator<Item = E::Id>,
445 {
446 let Self { intent, _marker } = self;
447 Self {
448 intent: intent.by_keys(keys),
449 _marker,
450 }
451 }
452
453 #[must_use]
455 pub fn delete(mut self) -> Self {
456 self.intent = self.intent.delete();
457 self
458 }
459
460 #[must_use]
464 pub fn limit(mut self, limit: u32) -> Self {
465 self.intent = self.intent.limit(limit);
466 self
467 }
468
469 #[must_use]
471 pub fn offset(mut self, offset: u32) -> Self {
472 self.intent = self.intent.offset(offset);
473 self
474 }
475
476 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
478 let plan = self.build_plan()?;
479
480 Ok(plan.explain())
481 }
482
483 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
485 let plan = self.build_plan()?;
486
487 Ok(ExecutablePlan::new(plan))
488 }
489
490 fn build_plan(&self) -> Result<LogicalPlan<E::Id>, QueryError> {
492 let plan_value = self.intent.build_plan_model()?;
493 let LogicalPlan {
494 mode,
495 access,
496 predicate,
497 order,
498 delete_limit,
499 page,
500 consistency,
501 } = plan_value;
502
503 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
504 let plan = LogicalPlan {
505 mode,
506 access,
507 predicate,
508 order,
509 delete_limit,
510 page,
511 consistency,
512 };
513
514 Ok(plan)
515 }
516}
517
518impl<E> Query<E>
519where
520 E: EntityKind + SingletonEntity,
521 E::Id: Default,
522{
523 pub(crate) fn only(self) -> Self {
525 let Self { intent, _marker } = self;
526
527 Self {
528 intent: intent.only(E::Id::default()),
529 _marker,
530 }
531 }
532}
533
534#[derive(Debug, ThisError)]
539pub enum QueryError {
540 #[error("{0}")]
541 Validate(#[from] ValidateError),
542
543 #[error("{0}")]
544 Plan(#[from] PlanError),
545
546 #[error("{0}")]
547 Intent(#[from] IntentError),
548
549 #[error("{0}")]
550 Response(#[from] ResponseError),
551
552 #[error("{0}")]
553 Execute(#[from] InternalError),
554}
555
556impl From<PlannerError> for QueryError {
557 fn from(err: PlannerError) -> Self {
558 match err {
559 PlannerError::Plan(err) => Self::Plan(err),
560 PlannerError::Internal(err) => Self::Execute(err),
561 }
562 }
563}
564
565#[derive(Clone, Copy, Debug, ThisError)]
570pub enum IntentError {
571 #[error("delete limit requires an explicit ordering")]
572 DeleteLimitRequiresOrder,
573
574 #[error("order specification must include at least one field")]
575 EmptyOrderSpec,
576
577 #[error("many() cannot be combined with predicates")]
578 ManyWithPredicate,
579
580 #[error("only() cannot be combined with predicates")]
581 OnlyWithPredicate,
582
583 #[error("multiple key access methods were used on the same query")]
584 KeyAccessConflict,
585}
586
587fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
589 match order {
590 Some(mut spec) => {
591 spec.fields.push((field.to_string(), direction));
592 spec
593 }
594 None => OrderSpec {
595 fields: vec![(field.to_string(), direction)],
596 },
597 }
598}