1#![allow(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 predicate::{
21 Predicate, SchemaInfo, UnsupportedQueryFeature, ValidateError, normalize,
22 validate::reject_unsupported_query_features,
23 },
24 },
25 response::ResponseError,
26 },
27 error::InternalError,
28 traits::{EntityKind, FieldValue, SingletonEntity},
29 value::Value,
30};
31use std::marker::PhantomData;
32use thiserror::Error as ThisError;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
42pub enum QueryMode {
43 Load(LoadSpec),
44 Delete(DeleteSpec),
45}
46
47impl QueryMode {
48 #[must_use]
50 pub const fn is_load(&self) -> bool {
51 match self {
52 Self::Load(_) => true,
53 Self::Delete(_) => false,
54 }
55 }
56
57 #[must_use]
59 pub const fn is_delete(&self) -> bool {
60 match self {
61 Self::Delete(_) => true,
62 Self::Load(_) => false,
63 }
64 }
65}
66
67#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
74pub struct LoadSpec {
75 pub limit: Option<u32>,
76 pub offset: u32,
77}
78
79impl LoadSpec {
80 #[must_use]
82 pub const fn new() -> Self {
83 Self {
84 limit: None,
85 offset: 0,
86 }
87 }
88}
89
90#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub struct DeleteSpec {
98 pub limit: Option<u32>,
99}
100
101impl DeleteSpec {
102 #[must_use]
104 pub const fn new() -> Self {
105 Self { limit: None }
106 }
107}
108
109#[derive(Debug)]
117pub(crate) struct QueryModel<'m, K> {
118 model: &'m crate::model::entity::EntityModel,
119 mode: QueryMode,
120 predicate: Option<Predicate>,
121 key_access: Option<KeyAccessState<K>>,
122 key_access_conflict: bool,
123 order: Option<OrderSpec>,
124 consistency: ReadConsistency,
125}
126
127impl<'m, K: FieldValue> QueryModel<'m, K> {
128 #[must_use]
129 pub const fn new(
130 model: &'m crate::model::entity::EntityModel,
131 consistency: ReadConsistency,
132 ) -> Self {
133 Self {
134 model,
135 mode: QueryMode::Load(LoadSpec::new()),
136 predicate: None,
137 key_access: None,
138 key_access_conflict: false,
139 order: None,
140 consistency,
141 }
142 }
143
144 #[must_use]
146 pub const fn mode(&self) -> QueryMode {
147 self.mode
148 }
149
150 #[must_use]
152 pub fn filter(mut self, predicate: Predicate) -> Self {
153 self.predicate = match self.predicate.take() {
154 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
155 None => Some(predicate),
156 };
157 self
158 }
159
160 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
162 let schema = SchemaInfo::from_entity_model(self.model)?;
163 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
164
165 Ok(self.filter(predicate))
166 }
167
168 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
170 let schema = SchemaInfo::from_entity_model(self.model)?;
171 let order = match expr.lower_with(&schema) {
172 Ok(order) => order,
173 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
174 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
175 };
176
177 if order.fields.is_empty() {
178 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
179 }
180
181 Ok(self.order_spec(order))
182 }
183
184 #[must_use]
186 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
187 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
188 self
189 }
190
191 #[must_use]
193 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
194 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
195 self
196 }
197
198 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
200 self.order = Some(order);
201 self
202 }
203
204 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
206 if let Some(existing) = &self.key_access
207 && existing.kind != kind
208 {
209 self.key_access_conflict = true;
210 }
211
212 self.key_access = Some(KeyAccessState { kind, access });
213
214 self
215 }
216
217 pub(crate) fn by_id(self, id: K) -> Self {
219 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
220 }
221
222 pub(crate) fn by_ids<I>(self, ids: I) -> Self
224 where
225 I: IntoIterator<Item = K>,
226 {
227 self.set_key_access(
228 KeyAccessKind::Many,
229 KeyAccess::Many(ids.into_iter().collect()),
230 )
231 }
232
233 pub(crate) fn only(self, id: K) -> Self {
235 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
236 }
237
238 #[must_use]
240 pub const fn delete(mut self) -> Self {
241 if self.mode.is_load() {
242 self.mode = QueryMode::Delete(DeleteSpec::new());
243 }
244 self
245 }
246
247 #[must_use]
251 pub const fn limit(mut self, limit: u32) -> Self {
252 match self.mode {
253 QueryMode::Load(mut spec) => {
254 spec.limit = Some(limit);
255 self.mode = QueryMode::Load(spec);
256 }
257 QueryMode::Delete(mut spec) => {
258 spec.limit = Some(limit);
259 self.mode = QueryMode::Delete(spec);
260 }
261 }
262 self
263 }
264
265 #[must_use]
267 pub const fn offset(mut self, offset: u32) -> Self {
268 if let QueryMode::Load(mut spec) = self.mode {
269 spec.offset = offset;
270 self.mode = QueryMode::Load(spec);
271 }
272 self
273 }
274
275 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
277 let schema_info = SchemaInfo::from_entity_model(self.model)?;
279 self.validate_intent()?;
280
281 if let Some(predicate) = self.predicate.as_ref() {
282 reject_unsupported_query_features(predicate)?;
283 }
284
285 let normalized_predicate = self.predicate.as_ref().map(normalize);
287 let access_plan_value = match &self.key_access {
288 Some(state) => access_plan_from_keys_value(&state.access),
289 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
290 };
291
292 let plan = LogicalPlan {
294 mode: self.mode,
295 access: access_plan_value,
296 predicate: normalized_predicate,
297 order: self.order.clone(),
298 delete_limit: match self.mode {
299 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
300 QueryMode::Load(_) => None,
301 },
302 page: match self.mode {
303 QueryMode::Load(spec) => {
304 if spec.limit.is_some() || spec.offset > 0 {
305 Some(PageSpec {
306 limit: spec.limit,
307 offset: spec.offset,
308 })
309 } else {
310 None
311 }
312 }
313 QueryMode::Delete(_) => None,
314 },
315 consistency: self.consistency,
316 };
317
318 validate_logical_plan_model(&schema_info, self.model, &plan)?;
319
320 Ok(plan)
321 }
322
323 const fn validate_intent(&self) -> Result<(), IntentError> {
325 if self.key_access_conflict {
326 return Err(IntentError::KeyAccessConflict);
327 }
328
329 if let Some(order) = &self.order
330 && order.fields.is_empty()
331 {
332 return Err(IntentError::EmptyOrderSpec);
333 }
334
335 if let Some(state) = &self.key_access {
336 match state.kind {
337 KeyAccessKind::Many if self.predicate.is_some() => {
338 return Err(IntentError::ByIdsWithPredicate);
339 }
340 KeyAccessKind::Only if self.predicate.is_some() => {
341 return Err(IntentError::OnlyWithPredicate);
342 }
343 _ => {
344 }
346 }
347 }
348
349 match self.mode {
350 QueryMode::Load(_) => {}
351 QueryMode::Delete(spec) => {
352 if spec.limit.is_some() && self.order.is_none() {
353 return Err(IntentError::DeleteLimitRequiresOrder);
354 }
355 }
356 }
357
358 Ok(())
359 }
360}
361
362#[derive(Debug)]
374pub struct Query<E: EntityKind> {
375 intent: QueryModel<'static, E::Key>,
376 #[allow(clippy::struct_field_names)]
377 _marker: PhantomData<E>,
378}
379
380impl<E: EntityKind> Query<E> {
381 #[must_use]
385 pub const fn new(consistency: ReadConsistency) -> Self {
386 Self {
387 intent: QueryModel::new(E::MODEL, consistency),
388 _marker: PhantomData,
389 }
390 }
391
392 #[must_use]
394 pub const fn mode(&self) -> QueryMode {
395 self.intent.mode()
396 }
397
398 #[must_use]
400 pub fn filter(mut self, predicate: Predicate) -> Self {
401 self.intent = self.intent.filter(predicate);
402 self
403 }
404
405 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
407 let Self { intent, _marker } = self;
408 let intent = intent.filter_expr(expr)?;
409
410 Ok(Self { intent, _marker })
411 }
412
413 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
415 let Self { intent, _marker } = self;
416 let intent = intent.sort_expr(expr)?;
417
418 Ok(Self { intent, _marker })
419 }
420
421 #[must_use]
423 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
424 self.intent = self.intent.order_by(field);
425 self
426 }
427
428 #[must_use]
430 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
431 self.intent = self.intent.order_by_desc(field);
432 self
433 }
434
435 pub(crate) fn by_id(self, id: E::Key) -> Self {
437 let Self { intent, _marker } = self;
438 Self {
439 intent: intent.by_id(id),
440 _marker,
441 }
442 }
443
444 pub(crate) fn by_ids<I>(self, ids: I) -> Self
446 where
447 I: IntoIterator<Item = E::Key>,
448 {
449 let Self { intent, _marker } = self;
450 Self {
451 intent: intent.by_ids(ids),
452 _marker,
453 }
454 }
455
456 #[must_use]
458 pub fn delete(mut self) -> Self {
459 self.intent = self.intent.delete();
460 self
461 }
462
463 #[must_use]
467 pub fn limit(mut self, limit: u32) -> Self {
468 self.intent = self.intent.limit(limit);
469 self
470 }
471
472 #[must_use]
474 pub fn offset(mut self, offset: u32) -> Self {
475 self.intent = self.intent.offset(offset);
476 self
477 }
478
479 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
481 let plan = self.build_plan()?;
482
483 Ok(plan.explain())
484 }
485
486 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
488 let plan = self.build_plan()?;
489
490 Ok(ExecutablePlan::new(plan))
491 }
492
493 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
495 let plan_value = self.intent.build_plan_model()?;
496 let LogicalPlan {
497 mode,
498 access,
499 predicate,
500 order,
501 delete_limit,
502 page,
503 consistency,
504 } = plan_value;
505
506 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
507 let plan = LogicalPlan {
508 mode,
509 access,
510 predicate,
511 order,
512 delete_limit,
513 page,
514 consistency,
515 };
516
517 Ok(plan)
518 }
519}
520
521impl<E> Query<E>
522where
523 E: EntityKind + SingletonEntity,
524 E::Key: Default,
525{
526 pub(crate) fn only(self) -> Self {
528 let Self { intent, _marker } = self;
529
530 Self {
531 intent: intent.only(E::Key::default()),
532 _marker,
533 }
534 }
535}
536
537#[derive(Debug, ThisError)]
542pub enum QueryError {
543 #[error("{0}")]
544 UnsupportedQueryFeature(#[from] UnsupportedQueryFeature),
545
546 #[error("{0}")]
547 Validate(#[from] ValidateError),
548
549 #[error("{0}")]
550 Plan(#[from] PlanError),
551
552 #[error("{0}")]
553 Intent(#[from] IntentError),
554
555 #[error("{0}")]
556 Response(#[from] ResponseError),
557
558 #[error("{0}")]
559 Execute(#[from] InternalError),
560}
561
562impl From<PlannerError> for QueryError {
563 fn from(err: PlannerError) -> Self {
564 match err {
565 PlannerError::Plan(err) => Self::Plan(err),
566 PlannerError::Internal(err) => Self::Execute(err),
567 }
568 }
569}
570
571#[derive(Clone, Copy, Debug, ThisError)]
576pub enum IntentError {
577 #[error("delete limit requires an explicit ordering")]
578 DeleteLimitRequiresOrder,
579
580 #[error("order specification must include at least one field")]
581 EmptyOrderSpec,
582
583 #[error("by_ids() cannot be combined with predicates")]
584 ByIdsWithPredicate,
585
586 #[error("only() cannot be combined with predicates")]
587 OnlyWithPredicate,
588
589 #[error("multiple key access methods were used on the same query")]
590 KeyAccessConflict,
591}
592
593fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
595 match order {
596 Some(mut spec) => {
597 spec.fields.push((field.to_string(), direction));
598 spec
599 }
600 None => OrderSpec {
601 fields: vec![(field.to_string(), direction)],
602 },
603 }
604}