1use crate::{
2 db::query::{
3 ReadConsistency,
4 expr::{FilterExpr, SortExpr, SortLowerError},
5 plan::{
6 AccessPath, AccessPlan, DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan,
7 OrderDirection, OrderSpec, PageSpec, PlanError, ProjectionSpec,
8 planner::{PlannerError, plan_access},
9 validate::validate_access_plan,
10 validate::validate_order,
11 },
12 predicate::{Predicate, SchemaInfo, ValidateError, normalize, validate},
13 },
14 db::response::ResponseError,
15 error::InternalError,
16 key::Key,
17 traits::{EntityKind, UnitKey},
18};
19use std::marker::PhantomData;
20use thiserror::Error as ThisError;
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
30pub enum QueryMode {
31 Load(LoadSpec),
32 Delete(DeleteSpec),
33}
34
35impl QueryMode {
36 #[must_use]
38 pub const fn is_load(&self) -> bool {
39 match self {
40 Self::Load(_) => true,
41 Self::Delete(_) => false,
42 }
43 }
44
45 #[must_use]
47 pub const fn is_delete(&self) -> bool {
48 match self {
49 Self::Delete(_) => true,
50 Self::Load(_) => false,
51 }
52 }
53}
54
55#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
62pub struct LoadSpec {
63 pub limit: Option<u32>,
64 pub offset: u32,
65}
66
67impl LoadSpec {
68 #[must_use]
70 pub const fn new() -> Self {
71 Self {
72 limit: None,
73 offset: 0,
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
85pub struct DeleteSpec {
86 pub limit: Option<u32>,
87}
88
89impl DeleteSpec {
90 #[must_use]
92 pub const fn new() -> Self {
93 Self { limit: None }
94 }
95}
96
97#[derive(Debug)]
109pub struct Query<E: EntityKind> {
110 mode: QueryMode,
111 predicate: Option<Predicate>,
112 key_access: Option<KeyAccessState>,
113 key_access_conflict: bool,
114 order: Option<OrderSpec>,
115 projection: ProjectionSpec,
116 consistency: ReadConsistency,
117 _marker: PhantomData<E>,
118}
119
120impl<E: EntityKind> Query<E> {
121 #[must_use]
125 pub const fn new(consistency: ReadConsistency) -> Self {
126 Self {
127 mode: QueryMode::Load(LoadSpec::new()),
128 predicate: None,
129 key_access: None,
130 key_access_conflict: false,
131 order: None,
132 projection: ProjectionSpec::All,
133 consistency,
134 _marker: PhantomData,
135 }
136 }
137
138 #[must_use]
140 pub const fn mode(&self) -> QueryMode {
141 self.mode
142 }
143
144 #[must_use]
146 pub fn filter(mut self, predicate: Predicate) -> Self {
147 self.predicate = match self.predicate.take() {
148 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
149 None => Some(predicate),
150 };
151 self
152 }
153
154 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
156 let predicate = expr.lower::<E>().map_err(QueryError::Validate)?;
157
158 Ok(self.filter(predicate))
159 }
160
161 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
163 let order = match expr.lower::<E>() {
164 Ok(order) => order,
165 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
166 Err(SortLowerError::Plan(err)) => return Err(QueryError::Plan(err)),
167 };
168
169 Ok(self.order_spec(order))
170 }
171
172 #[must_use]
174 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
175 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
176 self
177 }
178
179 #[must_use]
181 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
182 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
183 self
184 }
185
186 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
188 self.order = Some(order);
189 self
190 }
191
192 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess) -> Self {
194 if let Some(existing) = &self.key_access
195 && existing.kind != kind
196 {
197 self.key_access_conflict = true;
198 }
199
200 self.key_access = Some(KeyAccessState { kind, access });
201
202 self
203 }
204
205 pub(crate) fn by_key(self, key: Key) -> Self {
207 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(key))
208 }
209
210 pub(crate) fn by_keys<I>(self, keys: I) -> Self
212 where
213 I: IntoIterator<Item = Key>,
214 {
215 self.set_key_access(
216 KeyAccessKind::Many,
217 KeyAccess::Many(keys.into_iter().collect()),
218 )
219 }
220
221 #[must_use]
223 pub const fn delete(mut self) -> Self {
224 if self.mode.is_load() {
225 self.mode = QueryMode::Delete(DeleteSpec::new());
226 }
227 self
228 }
229
230 #[must_use]
234 pub const fn limit(mut self, limit: u32) -> Self {
235 match self.mode {
236 QueryMode::Load(mut spec) => {
237 spec.limit = Some(limit);
238 self.mode = QueryMode::Load(spec);
239 }
240 QueryMode::Delete(mut spec) => {
241 spec.limit = Some(limit);
242 self.mode = QueryMode::Delete(spec);
243 }
244 }
245 self
246 }
247
248 #[must_use]
250 pub const fn offset(mut self, offset: u32) -> Self {
251 if let QueryMode::Load(mut spec) = self.mode {
252 spec.offset = offset;
253 self.mode = QueryMode::Load(spec);
254 }
255 self
256 }
257
258 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
260 let plan = self.build_plan::<E>()?;
261
262 Ok(plan.explain())
263 }
264
265 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
267 let plan = self.build_plan::<E>()?;
268
269 Ok(ExecutablePlan::new(plan))
270 }
271
272 fn build_plan<T: EntityKind>(&self) -> Result<LogicalPlan, QueryError> {
274 let model = T::MODEL;
276 let schema_info = SchemaInfo::from_entity_model(model)?;
277 self.validate_intent()?;
278
279 if let Some(order) = &self.order {
280 validate_order(&schema_info, order)?;
281 }
282
283 let normalized_predicate = self.predicate.as_ref().map(normalize);
285 let access_plan = match &self.key_access {
286 Some(state) => {
287 if let Some(predicate) = self.predicate.as_ref() {
288 validate(&schema_info, predicate)?;
289 }
290 access_plan_from_keys(&state.access)
291 }
292 None => plan_access::<T>(&schema_info, normalized_predicate.as_ref())?,
293 };
294
295 validate_access_plan(&schema_info, model, &access_plan)?;
296
297 let plan = LogicalPlan {
299 mode: self.mode,
300 access: access_plan,
301 predicate: normalized_predicate,
302 order: self.order.clone(),
303 delete_limit: match self.mode {
304 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
305 QueryMode::Load(_) => None,
306 },
307 page: match self.mode {
308 QueryMode::Load(spec) => {
309 if spec.limit.is_some() || spec.offset > 0 {
310 Some(PageSpec {
311 limit: spec.limit,
312 offset: spec.offset,
313 })
314 } else {
315 None
316 }
317 }
318 QueryMode::Delete(_) => None,
319 },
320 projection: self.projection.clone(),
321 consistency: self.consistency,
322 };
323
324 Ok(plan)
325 }
326
327 const fn validate_intent(&self) -> Result<(), IntentError> {
329 if self.key_access_conflict {
330 return Err(IntentError::KeyAccessConflict);
331 }
332
333 if let Some(state) = &self.key_access {
334 match state.kind {
335 KeyAccessKind::Many if self.predicate.is_some() => {
336 return Err(IntentError::ManyWithPredicate);
337 }
338 KeyAccessKind::Only if self.predicate.is_some() => {
339 return Err(IntentError::OnlyWithPredicate);
340 }
341 _ => {}
342 }
343 }
344
345 match self.mode {
346 QueryMode::Load(_) => {}
347 QueryMode::Delete(spec) => {
348 if spec.limit.is_some() && self.order.is_none() {
349 return Err(IntentError::DeleteLimitRequiresOrder);
350 }
351 }
352 }
353
354 Ok(())
355 }
356}
357
358impl<E: EntityKind> Query<E>
359where
360 E::PrimaryKey: UnitKey,
361{
362 pub(crate) fn only(self) -> Self {
364 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(Key::Unit))
365 }
366}
367
368#[derive(Debug, ThisError)]
373pub enum QueryError {
374 #[error("{0}")]
375 Validate(#[from] ValidateError),
376
377 #[error("{0}")]
378 Plan(#[from] PlanError),
379
380 #[error("{0}")]
381 Intent(#[from] IntentError),
382
383 #[error("{0}")]
384 Response(#[from] ResponseError),
385
386 #[error("{0}")]
387 Execute(#[from] InternalError),
388}
389
390impl From<PlannerError> for QueryError {
391 fn from(err: PlannerError) -> Self {
392 match err {
393 PlannerError::Plan(err) => Self::Plan(err),
394 PlannerError::Internal(err) => Self::Execute(err),
395 }
396 }
397}
398
399#[derive(Clone, Copy, Debug, ThisError)]
404pub enum IntentError {
405 #[error("delete limit requires an explicit ordering")]
406 DeleteLimitRequiresOrder,
407
408 #[error("many() cannot be combined with predicates")]
409 ManyWithPredicate,
410
411 #[error("only() cannot be combined with predicates")]
412 OnlyWithPredicate,
413
414 #[error("multiple key access methods were used on the same query")]
415 KeyAccessConflict,
416}
417
418#[derive(Clone, Debug, Eq, PartialEq)]
420enum KeyAccess {
421 Single(Key),
422 Many(Vec<Key>),
423}
424
425#[derive(Clone, Copy, Debug, Eq, PartialEq)]
427enum KeyAccessKind {
428 Single,
429 Many,
430 Only,
431}
432
433#[derive(Clone, Debug, Eq, PartialEq)]
435struct KeyAccessState {
436 kind: KeyAccessKind,
437 access: KeyAccess,
438}
439
440fn access_plan_from_keys(access: &KeyAccess) -> AccessPlan {
442 match access {
443 KeyAccess::Single(key) => AccessPlan::Path(AccessPath::ByKey(*key)),
444 KeyAccess::Many(keys) => {
445 if let Some((first, rest)) = keys.split_first()
446 && rest.is_empty()
447 {
448 return AccessPlan::Path(AccessPath::ByKey(*first));
449 }
450
451 AccessPlan::Path(AccessPath::ByKeys(keys.clone()))
452 }
453 }
454}
455
456fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
458 match order {
459 Some(mut spec) => {
460 spec.fields.push((field.to_string(), direction));
461 spec
462 }
463 None => OrderSpec {
464 fields: vec![(field.to_string(), direction)],
465 },
466 }
467}
468
469#[cfg(test)]
470mod tests;