1use crate::{
2 db::query::{
3 ReadConsistency,
4 expr::{FilterExpr, SortExpr, SortLowerError},
5 plan::{
6 AccessPath, AccessPlan, DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan,
7 OrderDirection, OrderSpec, PageSpec, PlanError,
8 planner::{PlannerError, plan_access},
9 validate::{validate_access_plan, validate_logical_plan, validate_order},
10 },
11 predicate::{Predicate, SchemaInfo, ValidateError, normalize, validate},
12 },
13 db::response::ResponseError,
14 error::InternalError,
15 key::Key,
16 traits::{EntityKind, UnitKey},
17};
18use std::marker::PhantomData;
19use thiserror::Error as ThisError;
20
21#[derive(Clone, Copy, Debug, Eq, PartialEq)]
29pub enum QueryMode {
30 Load(LoadSpec),
31 Delete(DeleteSpec),
32}
33
34impl QueryMode {
35 #[must_use]
37 pub const fn is_load(&self) -> bool {
38 match self {
39 Self::Load(_) => true,
40 Self::Delete(_) => false,
41 }
42 }
43
44 #[must_use]
46 pub const fn is_delete(&self) -> bool {
47 match self {
48 Self::Delete(_) => true,
49 Self::Load(_) => false,
50 }
51 }
52}
53
54#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
61pub struct LoadSpec {
62 pub limit: Option<u32>,
63 pub offset: u32,
64}
65
66impl LoadSpec {
67 #[must_use]
69 pub const fn new() -> Self {
70 Self {
71 limit: None,
72 offset: 0,
73 }
74 }
75}
76
77#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
84pub struct DeleteSpec {
85 pub limit: Option<u32>,
86}
87
88impl DeleteSpec {
89 #[must_use]
91 pub const fn new() -> Self {
92 Self { limit: None }
93 }
94}
95
96#[derive(Debug)]
108pub struct Query<E: EntityKind> {
109 mode: QueryMode,
110 predicate: Option<Predicate>,
111 key_access: Option<KeyAccessState>,
112 key_access_conflict: bool,
113 order: Option<OrderSpec>,
114 consistency: ReadConsistency,
115 _marker: PhantomData<E>,
116}
117
118impl<E: EntityKind> Query<E> {
119 #[must_use]
123 pub const fn new(consistency: ReadConsistency) -> Self {
124 Self {
125 mode: QueryMode::Load(LoadSpec::new()),
126 predicate: None,
127 key_access: None,
128 key_access_conflict: false,
129 order: None,
130 consistency,
131 _marker: PhantomData,
132 }
133 }
134
135 #[must_use]
137 pub const fn mode(&self) -> QueryMode {
138 self.mode
139 }
140
141 #[must_use]
143 pub fn filter(mut self, predicate: Predicate) -> Self {
144 self.predicate = match self.predicate.take() {
145 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
146 None => Some(predicate),
147 };
148 self
149 }
150
151 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
153 let predicate = expr.lower::<E>().map_err(QueryError::Validate)?;
154
155 Ok(self.filter(predicate))
156 }
157
158 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
160 let order = match expr.lower::<E>() {
161 Ok(order) => order,
162 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
163 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
164 };
165
166 if order.fields.is_empty() {
167 return Err(QueryError::Intent(IntentError::EmptyOrderSpec));
168 }
169
170 Ok(self.order_spec(order))
171 }
172
173 #[must_use]
175 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
176 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
177 self
178 }
179
180 #[must_use]
182 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
183 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
184 self
185 }
186
187 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
189 self.order = Some(order);
190 self
191 }
192
193 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess) -> Self {
195 if let Some(existing) = &self.key_access
196 && existing.kind != kind
197 {
198 self.key_access_conflict = true;
199 }
200
201 self.key_access = Some(KeyAccessState { kind, access });
202
203 self
204 }
205
206 pub(crate) fn by_key(self, key: Key) -> Self {
208 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
209 }
210
211 pub(crate) fn by_keys<I>(self, keys: I) -> Self
213 where
214 I: IntoIterator<Item = Key>,
215 {
216 self.set_key_access(
217 KeyAccessKind::Many,
218 KeyAccess::Many(keys.into_iter().collect()),
219 )
220 }
221
222 #[must_use]
224 pub const fn delete(mut self) -> Self {
225 if self.mode.is_load() {
226 self.mode = QueryMode::Delete(DeleteSpec::new());
227 }
228 self
229 }
230
231 #[must_use]
235 pub const fn limit(mut self, limit: u32) -> Self {
236 match self.mode {
237 QueryMode::Load(mut spec) => {
238 spec.limit = Some(limit);
239 self.mode = QueryMode::Load(spec);
240 }
241 QueryMode::Delete(mut spec) => {
242 spec.limit = Some(limit);
243 self.mode = QueryMode::Delete(spec);
244 }
245 }
246 self
247 }
248
249 #[must_use]
251 pub const fn offset(mut self, offset: u32) -> Self {
252 if let QueryMode::Load(mut spec) = self.mode {
253 spec.offset = offset;
254 self.mode = QueryMode::Load(spec);
255 }
256 self
257 }
258
259 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
261 let plan = self.build_plan::<E>()?;
262
263 Ok(plan.explain())
264 }
265
266 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
268 let plan = self.build_plan::<E>()?;
269
270 Ok(ExecutablePlan::new(plan))
271 }
272
273 fn build_plan<T: EntityKind>(&self) -> Result<LogicalPlan, QueryError> {
275 let model = T::MODEL;
277 let schema_info = SchemaInfo::from_entity_model(model)?;
278 self.validate_intent()?;
279
280 if let Some(order) = &self.order {
281 validate_order(&schema_info, order)?;
282 }
283
284 let normalized_predicate = self.predicate.as_ref().map(normalize);
286 let access_plan = match &self.key_access {
287 Some(state) => {
288 if let Some(predicate) = self.predicate.as_ref() {
289 validate(&schema_info, predicate)?;
290 }
291 access_plan_from_keys(&state.access)
292 }
293 None => plan_access::<T>(&schema_info, normalized_predicate.as_ref())?,
294 };
295
296 validate_access_plan(&schema_info, model, &access_plan)?;
297
298 let plan = LogicalPlan {
300 mode: self.mode,
301 access: access_plan,
302 predicate: normalized_predicate,
303 order: self.order.clone(),
304 delete_limit: match self.mode {
305 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
306 QueryMode::Load(_) => None,
307 },
308 page: match self.mode {
309 QueryMode::Load(spec) => {
310 if spec.limit.is_some() || spec.offset > 0 {
311 Some(PageSpec {
312 limit: spec.limit,
313 offset: spec.offset,
314 })
315 } else {
316 None
317 }
318 }
319 QueryMode::Delete(_) => None,
320 },
321 consistency: self.consistency,
322 };
323
324 validate_logical_plan(&schema_info, model, &plan)?;
325
326 Ok(plan)
327 }
328
329 const fn validate_intent(&self) -> Result<(), IntentError> {
331 if self.key_access_conflict {
332 return Err(IntentError::KeyAccessConflict);
333 }
334
335 if let Some(order) = &self.order
336 && order.fields.is_empty()
337 {
338 return Err(IntentError::EmptyOrderSpec);
339 }
340
341 if let Some(state) = &self.key_access {
342 match state.kind {
343 KeyAccessKind::Many if self.predicate.is_some() => {
344 return Err(IntentError::ManyWithPredicate);
345 }
346 KeyAccessKind::Only if self.predicate.is_some() => {
347 return Err(IntentError::OnlyWithPredicate);
348 }
349 _ => {}
350 }
351 }
352
353 match self.mode {
354 QueryMode::Load(_) => {}
355 QueryMode::Delete(spec) => {
356 if spec.limit.is_some() && self.order.is_none() {
357 return Err(IntentError::DeleteLimitRequiresOrder);
358 }
359 }
360 }
361
362 Ok(())
363 }
364}
365
366impl<E: EntityKind> Query<E>
367where
368 E::PrimaryKey: UnitKey,
369{
370 pub(crate) fn only(self) -> Self {
372 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(Key::Unit))
373 }
374}
375
376#[derive(Debug, ThisError)]
381pub enum QueryError {
382 #[error("{0}")]
383 Validate(#[from] ValidateError),
384
385 #[error("{0}")]
386 Plan(#[from] PlanError),
387
388 #[error("{0}")]
389 Intent(#[from] IntentError),
390
391 #[error("{0}")]
392 Response(#[from] ResponseError),
393
394 #[error("{0}")]
395 Execute(#[from] InternalError),
396}
397
398impl From<PlannerError> for QueryError {
399 fn from(err: PlannerError) -> Self {
400 match err {
401 PlannerError::Plan(err) => Self::Plan(err),
402 PlannerError::Internal(err) => Self::Execute(err),
403 }
404 }
405}
406
407#[derive(Clone, Copy, Debug, ThisError)]
412pub enum IntentError {
413 #[error("delete limit requires an explicit ordering")]
414 DeleteLimitRequiresOrder,
415
416 #[error("order specification must include at least one field")]
417 EmptyOrderSpec,
418
419 #[error("many() cannot be combined with predicates")]
420 ManyWithPredicate,
421
422 #[error("only() cannot be combined with predicates")]
423 OnlyWithPredicate,
424
425 #[error("multiple key access methods were used on the same query")]
426 KeyAccessConflict,
427}
428
429#[derive(Clone, Debug, Eq, PartialEq)]
431enum KeyAccess {
432 Single(Key),
433 Many(Vec<Key>),
434}
435
436#[derive(Clone, Copy, Debug, Eq, PartialEq)]
438enum KeyAccessKind {
439 Single,
440 Many,
441 Only,
442}
443
444#[derive(Clone, Debug, Eq, PartialEq)]
446struct KeyAccessState {
447 kind: KeyAccessKind,
448 access: KeyAccess,
449}
450
451fn access_plan_from_keys(access: &KeyAccess) -> AccessPlan {
453 match access {
454 KeyAccess::Single(key) => AccessPlan::Path(AccessPath::ByKey(*key)),
455 KeyAccess::Many(keys) => {
456 if let Some((first, rest)) = keys.split_first()
457 && rest.is_empty()
458 {
459 return AccessPlan::Path(AccessPath::ByKey(*first));
460 }
461
462 AccessPlan::Path(AccessPath::ByKeys(keys.clone()))
463 }
464 }
465}
466
467fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
469 match order {
470 Some(mut spec) => {
471 spec.fields.push((field.to_string(), direction));
472 spec
473 }
474 None => OrderSpec {
475 fields: vec![(field.to_string(), direction)],
476 },
477 }
478}
479
480#[cfg(test)]
481mod tests;