1use crate::{
2 db::query::{
3 ReadConsistency,
4 expr::{FilterExpr, SortExpr, SortLowerError},
5 plan::{
6 AccessPath, AccessPlan, DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan,
7 OrderDirection, OrderSpec, PageSpec, PlanError, ProjectionSpec,
8 planner::{PlannerError, plan_access},
9 validate::validate_access_plan,
10 validate::validate_order,
11 },
12 predicate::{Predicate, SchemaInfo, ValidateError, normalize, validate},
13 },
14 db::response::ResponseError,
15 error::InternalError,
16 key::Key,
17 traits::{EntityKind, UnitKey},
18};
19use std::marker::PhantomData;
20use thiserror::Error as ThisError;
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
30pub enum QueryMode {
31 Load(LoadSpec),
32 Delete(DeleteSpec),
33}
34
35impl QueryMode {
36 #[must_use]
38 pub const fn is_load(&self) -> bool {
39 match self {
40 Self::Load(_) => true,
41 Self::Delete(_) => false,
42 }
43 }
44
45 #[must_use]
47 pub const fn is_delete(&self) -> bool {
48 match self {
49 Self::Delete(_) => true,
50 Self::Load(_) => false,
51 }
52 }
53}
54
55#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
62pub struct LoadSpec {
63 pub limit: Option<u32>,
64 pub offset: u32,
65}
66
67impl LoadSpec {
68 #[must_use]
70 pub const fn new() -> Self {
71 Self {
72 limit: None,
73 offset: 0,
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
85pub struct DeleteSpec {
86 pub limit: Option<u32>,
87}
88
89impl DeleteSpec {
90 #[must_use]
92 pub const fn new() -> Self {
93 Self { limit: None }
94 }
95}
96
97#[derive(Debug)]
109pub struct Query<E: EntityKind> {
110 mode: QueryMode,
111 predicate: Option<Predicate>,
112 key_access: Option<KeyAccessState>,
113 key_access_conflict: bool,
114 order: Option<OrderSpec>,
115 projection: ProjectionSpec,
116 consistency: ReadConsistency,
117 _marker: PhantomData<E>,
118}
119
120impl<E: EntityKind> Query<E> {
121 #[must_use]
125 pub const fn new(consistency: ReadConsistency) -> Self {
126 Self {
127 mode: QueryMode::Load(LoadSpec::new()),
128 predicate: None,
129 key_access: None,
130 key_access_conflict: false,
131 order: None,
132 projection: ProjectionSpec::All,
133 consistency,
134 _marker: PhantomData,
135 }
136 }
137
138 #[must_use]
140 pub const fn mode(&self) -> QueryMode {
141 self.mode
142 }
143
144 #[must_use]
146 pub fn filter(mut self, predicate: Predicate) -> Self {
147 self.predicate = match self.predicate.take() {
148 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
149 None => Some(predicate),
150 };
151 self
152 }
153
154 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
156 let predicate = expr.lower::<E>().map_err(QueryError::Validate)?;
157
158 Ok(self.filter(predicate))
159 }
160
161 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
163 let order = match expr.lower::<E>() {
164 Ok(order) => order,
165 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
166 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
167 };
168
169 if order.fields.is_empty() {
170 return Ok(Self {
172 order: None,
173 ..self
174 });
175 }
176
177 Ok(self.order_spec(order))
178 }
179
180 #[must_use]
182 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
183 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
184 self
185 }
186
187 #[must_use]
189 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
190 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
191 self
192 }
193
194 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
196 self.order = Some(order);
197 self
198 }
199
200 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess) -> Self {
202 if let Some(existing) = &self.key_access
203 && existing.kind != kind
204 {
205 self.key_access_conflict = true;
206 }
207
208 self.key_access = Some(KeyAccessState { kind, access });
209
210 self
211 }
212
213 pub(crate) fn by_key(self, key: Key) -> Self {
215 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
216 }
217
218 pub(crate) fn by_keys<I>(self, keys: I) -> Self
220 where
221 I: IntoIterator<Item = Key>,
222 {
223 self.set_key_access(
224 KeyAccessKind::Many,
225 KeyAccess::Many(keys.into_iter().collect()),
226 )
227 }
228
229 #[must_use]
231 pub const fn delete(mut self) -> Self {
232 if self.mode.is_load() {
233 self.mode = QueryMode::Delete(DeleteSpec::new());
234 }
235 self
236 }
237
238 #[must_use]
242 pub const fn limit(mut self, limit: u32) -> Self {
243 match self.mode {
244 QueryMode::Load(mut spec) => {
245 spec.limit = Some(limit);
246 self.mode = QueryMode::Load(spec);
247 }
248 QueryMode::Delete(mut spec) => {
249 spec.limit = Some(limit);
250 self.mode = QueryMode::Delete(spec);
251 }
252 }
253 self
254 }
255
256 #[must_use]
258 pub const fn offset(mut self, offset: u32) -> Self {
259 if let QueryMode::Load(mut spec) = self.mode {
260 spec.offset = offset;
261 self.mode = QueryMode::Load(spec);
262 }
263 self
264 }
265
266 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
268 let plan = self.build_plan::<E>()?;
269
270 Ok(plan.explain())
271 }
272
273 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
275 let plan = self.build_plan::<E>()?;
276
277 Ok(ExecutablePlan::new(plan))
278 }
279
280 fn build_plan<T: EntityKind>(&self) -> Result<LogicalPlan, QueryError> {
282 let model = T::MODEL;
284 let schema_info = SchemaInfo::from_entity_model(model)?;
285 self.validate_intent()?;
286
287 if let Some(order) = &self.order {
288 validate_order(&schema_info, order)?;
289 }
290
291 let normalized_predicate = self.predicate.as_ref().map(normalize);
293 let access_plan = match &self.key_access {
294 Some(state) => {
295 if let Some(predicate) = self.predicate.as_ref() {
296 validate(&schema_info, predicate)?;
297 }
298 access_plan_from_keys(&state.access)
299 }
300 None => plan_access::<T>(&schema_info, normalized_predicate.as_ref())?,
301 };
302
303 validate_access_plan(&schema_info, model, &access_plan)?;
304
305 let plan = LogicalPlan {
307 mode: self.mode,
308 access: access_plan,
309 predicate: normalized_predicate,
310 order: self.order.clone(),
311 delete_limit: match self.mode {
312 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
313 QueryMode::Load(_) => None,
314 },
315 page: match self.mode {
316 QueryMode::Load(spec) => {
317 if spec.limit.is_some() || spec.offset > 0 {
318 Some(PageSpec {
319 limit: spec.limit,
320 offset: spec.offset,
321 })
322 } else {
323 None
324 }
325 }
326 QueryMode::Delete(_) => None,
327 },
328 projection: self.projection.clone(),
329 consistency: self.consistency,
330 };
331
332 Ok(plan)
333 }
334
335 const fn validate_intent(&self) -> Result<(), IntentError> {
337 if self.key_access_conflict {
338 return Err(IntentError::KeyAccessConflict);
339 }
340
341 if let Some(state) = &self.key_access {
342 match state.kind {
343 KeyAccessKind::Many if self.predicate.is_some() => {
344 return Err(IntentError::ManyWithPredicate);
345 }
346 KeyAccessKind::Only if self.predicate.is_some() => {
347 return Err(IntentError::OnlyWithPredicate);
348 }
349 _ => {}
350 }
351 }
352
353 match self.mode {
354 QueryMode::Load(_) => {}
355 QueryMode::Delete(spec) => {
356 if spec.limit.is_some() && self.order.is_none() {
357 return Err(IntentError::DeleteLimitRequiresOrder);
358 }
359 }
360 }
361
362 Ok(())
363 }
364}
365
366impl<E: EntityKind> Query<E>
367where
368 E::PrimaryKey: UnitKey,
369{
370 pub(crate) fn only(self) -> Self {
372 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(Key::Unit))
373 }
374}
375
376#[derive(Debug, ThisError)]
381pub enum QueryError {
382 #[error("{0}")]
383 Validate(#[from] ValidateError),
384
385 #[error("{0}")]
386 Plan(#[from] PlanError),
387
388 #[error("{0}")]
389 Intent(#[from] IntentError),
390
391 #[error("{0}")]
392 Response(#[from] ResponseError),
393
394 #[error("{0}")]
395 Execute(#[from] InternalError),
396}
397
398impl From<PlannerError> for QueryError {
399 fn from(err: PlannerError) -> Self {
400 match err {
401 PlannerError::Plan(err) => Self::Plan(err),
402 PlannerError::Internal(err) => Self::Execute(err),
403 }
404 }
405}
406
407#[derive(Clone, Copy, Debug, ThisError)]
412pub enum IntentError {
413 #[error("delete limit requires an explicit ordering")]
414 DeleteLimitRequiresOrder,
415
416 #[error("many() cannot be combined with predicates")]
417 ManyWithPredicate,
418
419 #[error("only() cannot be combined with predicates")]
420 OnlyWithPredicate,
421
422 #[error("multiple key access methods were used on the same query")]
423 KeyAccessConflict,
424}
425
426#[derive(Clone, Debug, Eq, PartialEq)]
428enum KeyAccess {
429 Single(Key),
430 Many(Vec<Key>),
431}
432
433#[derive(Clone, Copy, Debug, Eq, PartialEq)]
435enum KeyAccessKind {
436 Single,
437 Many,
438 Only,
439}
440
441#[derive(Clone, Debug, Eq, PartialEq)]
443struct KeyAccessState {
444 kind: KeyAccessKind,
445 access: KeyAccess,
446}
447
448fn access_plan_from_keys(access: &KeyAccess) -> AccessPlan {
450 match access {
451 KeyAccess::Single(key) => AccessPlan::Path(AccessPath::ByKey(*key)),
452 KeyAccess::Many(keys) => {
453 if let Some((first, rest)) = keys.split_first()
454 && rest.is_empty()
455 {
456 return AccessPlan::Path(AccessPath::ByKey(*first));
457 }
458
459 AccessPlan::Path(AccessPath::ByKeys(keys.clone()))
460 }
461 }
462}
463
464fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
466 match order {
467 Some(mut spec) => {
468 spec.fields.push((field.to_string(), direction));
469 spec
470 }
471 None => OrderSpec {
472 fields: vec![(field.to_string(), direction)],
473 },
474 }
475}
476
477#[cfg(test)]
478mod tests;