1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{
21 Predicate, SchemaInfo, UnsupportedQueryFeature, ValidateError, normalize,
22 validate::reject_unsupported_query_features,
23 },
24 },
25 response::ResponseError,
26 },
27 error::InternalError,
28 traits::{EntityKind, FieldValue, SingletonEntity},
29 value::Value,
30};
31use std::marker::PhantomData;
32use thiserror::Error as ThisError;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
42pub enum QueryMode {
43 Load(LoadSpec),
44 Delete(DeleteSpec),
45}
46
47impl QueryMode {
48 #[must_use]
50 pub const fn is_load(&self) -> bool {
51 match self {
52 Self::Load(_) => true,
53 Self::Delete(_) => false,
54 }
55 }
56
57 #[must_use]
59 pub const fn is_delete(&self) -> bool {
60 match self {
61 Self::Delete(_) => true,
62 Self::Load(_) => false,
63 }
64 }
65}
66
67#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
74pub struct LoadSpec {
75 pub limit: Option<u32>,
76 pub offset: u32,
77}
78
79impl LoadSpec {
80 #[must_use]
82 pub const fn new() -> Self {
83 Self {
84 limit: None,
85 offset: 0,
86 }
87 }
88}
89
90#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub struct DeleteSpec {
98 pub limit: Option<u32>,
99}
100
101impl DeleteSpec {
102 #[must_use]
104 pub const fn new() -> Self {
105 Self { limit: None }
106 }
107}
108
109#[derive(Debug)]
117pub(crate) struct QueryModel<'m, K> {
118 model: &'m crate::model::entity::EntityModel,
119 mode: QueryMode,
120 predicate: Option<Predicate>,
121 key_access: Option<KeyAccessState<K>>,
122 key_access_conflict: bool,
123 order: Option<OrderSpec>,
124 consistency: ReadConsistency,
125}
126
127impl<'m, K: FieldValue> QueryModel<'m, K> {
128 #[must_use]
129 pub const fn new(
130 model: &'m crate::model::entity::EntityModel,
131 consistency: ReadConsistency,
132 ) -> Self {
133 Self {
134 model,
135 mode: QueryMode::Load(LoadSpec::new()),
136 predicate: None,
137 key_access: None,
138 key_access_conflict: false,
139 order: None,
140 consistency,
141 }
142 }
143
144 #[must_use]
146 pub const fn mode(&self) -> QueryMode {
147 self.mode
148 }
149
150 #[must_use]
152 pub fn filter(mut self, predicate: Predicate) -> Self {
153 self.predicate = match self.predicate.take() {
154 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
155 None => Some(predicate),
156 };
157 self
158 }
159
160 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
162 let schema = SchemaInfo::from_entity_model(self.model)?;
163 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
164
165 Ok(self.filter(predicate))
166 }
167
168 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
170 let schema = SchemaInfo::from_entity_model(self.model)?;
171 let order = match expr.lower_with(&schema) {
172 Ok(order) => order,
173 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
174 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
175 };
176
177 if order.fields.is_empty() {
178 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
179 }
180
181 Ok(self.order_spec(order))
182 }
183
184 #[must_use]
186 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
187 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
188 self
189 }
190
191 #[must_use]
193 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
194 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
195 self
196 }
197
198 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
200 self.order = Some(order);
201 self
202 }
203
204 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
206 if let Some(existing) = &self.key_access
207 && existing.kind != kind
208 {
209 self.key_access_conflict = true;
210 }
211
212 self.key_access = Some(KeyAccessState { kind, access });
213
214 self
215 }
216
217 pub(crate) fn by_id(self, id: K) -> Self {
219 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
220 }
221
222 pub(crate) fn by_ids<I>(self, ids: I) -> Self
224 where
225 I: IntoIterator<Item = K>,
226 {
227 self.set_key_access(
228 KeyAccessKind::Many,
229 KeyAccess::Many(ids.into_iter().collect()),
230 )
231 }
232
233 pub(crate) fn only(self, id: K) -> Self {
235 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
236 }
237
238 #[must_use]
240 pub const fn delete(mut self) -> Self {
241 if self.mode.is_load() {
242 self.mode = QueryMode::Delete(DeleteSpec::new());
243 }
244 self
245 }
246
247 #[must_use]
251 pub const fn limit(mut self, limit: u32) -> Self {
252 match self.mode {
253 QueryMode::Load(mut spec) => {
254 spec.limit = Some(limit);
255 self.mode = QueryMode::Load(spec);
256 }
257 QueryMode::Delete(mut spec) => {
258 spec.limit = Some(limit);
259 self.mode = QueryMode::Delete(spec);
260 }
261 }
262 self
263 }
264
265 #[must_use]
267 pub const fn offset(mut self, offset: u32) -> Self {
268 if let QueryMode::Load(mut spec) = self.mode {
269 spec.offset = offset;
270 self.mode = QueryMode::Load(spec);
271 }
272 self
273 }
274
275 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
277 let schema_info = SchemaInfo::from_entity_model(self.model)?;
279 self.validate_intent()?;
280
281 if let Some(predicate) = self.predicate.as_ref() {
282 reject_unsupported_query_features(predicate)?;
283 }
284
285 let normalized_predicate = self.predicate.as_ref().map(normalize);
287 let access_plan_value = match &self.key_access {
288 Some(state) => access_plan_from_keys_value(&state.access),
289 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
290 };
291
292 let plan = LogicalPlan {
294 mode: self.mode,
295 access: access_plan_value,
296 predicate: normalized_predicate,
297 order: self.order.clone(),
298 delete_limit: match self.mode {
299 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
300 QueryMode::Load(_) => None,
301 },
302 page: match self.mode {
303 QueryMode::Load(spec) => {
304 if spec.limit.is_some() || spec.offset > 0 {
305 Some(PageSpec {
306 limit: spec.limit,
307 offset: spec.offset,
308 })
309 } else {
310 None
311 }
312 }
313 QueryMode::Delete(_) => None,
314 },
315 consistency: self.consistency,
316 };
317
318 validate_logical_plan_model(&schema_info, self.model, &plan)?;
319
320 Ok(plan)
321 }
322
323 const fn validate_intent(&self) -> Result<(), IntentError> {
325 if self.key_access_conflict {
326 return Err(IntentError::KeyAccessConflict);
327 }
328
329 if let Some(order) = &self.order
330 && order.fields.is_empty()
331 {
332 return Err(IntentError::EmptyOrderSpec);
333 }
334
335 if let Some(state) = &self.key_access {
336 match state.kind {
337 KeyAccessKind::Many if self.predicate.is_some() => {
338 return Err(IntentError::ByIdsWithPredicate);
339 }
340 KeyAccessKind::Only if self.predicate.is_some() => {
341 return Err(IntentError::OnlyWithPredicate);
342 }
343 _ => {
344 }
346 }
347 }
348
349 match self.mode {
350 QueryMode::Load(_) => {}
351 QueryMode::Delete(spec) => {
352 if spec.limit.is_some() && self.order.is_none() {
353 return Err(IntentError::DeleteLimitRequiresOrder);
354 }
355 }
356 }
357
358 Ok(())
359 }
360}
361
362#[derive(Debug)]
374pub struct Query<E: EntityKind> {
375 intent: QueryModel<'static, E::Key>,
376 #[allow(clippy::struct_field_names)]
377 _marker: PhantomData<E>,
378}
379
380impl<E: EntityKind> Query<E> {
381 #[must_use]
385 pub const fn new(consistency: ReadConsistency) -> Self {
386 Self {
387 intent: QueryModel::new(E::MODEL, consistency),
388 _marker: PhantomData,
389 }
390 }
391
392 #[must_use]
394 pub const fn mode(&self) -> QueryMode {
395 self.intent.mode()
396 }
397
398 #[must_use]
400 pub fn filter(mut self, predicate: Predicate) -> Self {
401 self.intent = self.intent.filter(predicate);
402 self
403 }
404
405 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
407 let Self { intent, _marker } = self;
408 let intent = intent.filter_expr(expr)?;
409
410 Ok(Self { intent, _marker })
411 }
412
413 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
415 let Self { intent, _marker } = self;
416 let intent = intent.sort_expr(expr)?;
417
418 Ok(Self { intent, _marker })
419 }
420
421 #[must_use]
423 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
424 self.intent = self.intent.order_by(field);
425 self
426 }
427
428 #[must_use]
430 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
431 self.intent = self.intent.order_by_desc(field);
432 self
433 }
434
435 pub(crate) fn by_id(self, id: E::Key) -> Self {
437 let Self { intent, _marker } = self;
438 Self {
439 intent: intent.by_id(id),
440 _marker,
441 }
442 }
443
444 pub(crate) fn by_ids<I>(self, ids: I) -> Self
446 where
447 I: IntoIterator<Item = E::Key>,
448 {
449 let Self { intent, _marker } = self;
450 Self {
451 intent: intent.by_ids(ids),
452 _marker,
453 }
454 }
455
456 #[must_use]
458 pub fn delete(mut self) -> Self {
459 self.intent = self.intent.delete();
460 self
461 }
462
463 #[must_use]
469 pub fn limit(mut self, limit: u32) -> Self {
470 self.intent = self.intent.limit(limit);
471 self
472 }
473
474 #[must_use]
478 pub fn offset(mut self, offset: u32) -> Self {
479 self.intent = self.intent.offset(offset);
480 self
481 }
482
483 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
485 let plan = self.build_plan()?;
486
487 Ok(plan.explain())
488 }
489
490 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
492 let plan = self.build_plan()?;
493
494 Ok(ExecutablePlan::new(plan))
495 }
496
497 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
499 let plan_value = self.intent.build_plan_model()?;
500 let LogicalPlan {
501 mode,
502 access,
503 predicate,
504 order,
505 delete_limit,
506 page,
507 consistency,
508 } = plan_value;
509
510 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
511 let plan = LogicalPlan {
512 mode,
513 access,
514 predicate,
515 order,
516 delete_limit,
517 page,
518 consistency,
519 };
520
521 Ok(plan)
522 }
523}
524
525impl<E> Query<E>
526where
527 E: EntityKind + SingletonEntity,
528 E::Key: Default,
529{
530 pub(crate) fn only(self) -> Self {
532 let Self { intent, _marker } = self;
533
534 Self {
535 intent: intent.only(E::Key::default()),
536 _marker,
537 }
538 }
539}
540
541#[derive(Debug, ThisError)]
546pub enum QueryError {
547 #[error("{0}")]
548 UnsupportedQueryFeature(#[from] UnsupportedQueryFeature),
549
550 #[error("{0}")]
551 Validate(#[from] ValidateError),
552
553 #[error("{0}")]
554 Plan(#[from] PlanError),
555
556 #[error("{0}")]
557 Intent(#[from] IntentError),
558
559 #[error("{0}")]
560 Response(#[from] ResponseError),
561
562 #[error("{0}")]
563 Execute(#[from] InternalError),
564}
565
566impl From<PlannerError> for QueryError {
567 fn from(err: PlannerError) -> Self {
568 match err {
569 PlannerError::Plan(err) => Self::Plan(err),
570 PlannerError::Internal(err) => Self::Execute(err),
571 }
572 }
573}
574
575#[derive(Clone, Copy, Debug, ThisError)]
580pub enum IntentError {
581 #[error("delete limit requires an explicit ordering")]
582 DeleteLimitRequiresOrder,
583
584 #[error("order specification must include at least one field")]
585 EmptyOrderSpec,
586
587 #[error("by_ids() cannot be combined with predicates")]
588 ByIdsWithPredicate,
589
590 #[error("only() cannot be combined with predicates")]
591 OnlyWithPredicate,
592
593 #[error("multiple key access methods were used on the same query")]
594 KeyAccessConflict,
595}
596
597fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
599 match order {
600 Some(mut spec) => {
601 spec.fields.push((field.to_string(), direction));
602 spec
603 }
604 None => OrderSpec {
605 fields: vec![(field.to_string(), direction)],
606 },
607 }
608}