1use crate::{
2 db::{
3 DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4 predicate::Predicate,
5 query::{
6 explain::ExplainPlan,
7 expr::{FilterExpr, SortExpr},
8 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
9 policy,
10 },
11 response::Response,
12 },
13 traits::{EntityKind, EntityValue, SingletonEntity},
14 types::{Decimal, Id},
15 value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20pub struct FluentLoadQuery<'a, E>
29where
30 E: EntityKind,
31{
32 session: &'a DbSession<E::Canister>,
33 query: Query<E>,
34 cursor_token: Option<String>,
35}
36
37pub struct PagedLoadQuery<'a, E>
45where
46 E: EntityKind,
47{
48 inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53 E: EntityKind,
54{
55 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56 Self {
57 session,
58 query,
59 cursor_token: None,
60 }
61 }
62
63 #[must_use]
68 pub const fn query(&self) -> &Query<E> {
69 &self.query
70 }
71
72 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73 self.query = map(self.query);
74 self
75 }
76
77 fn try_map_query(
78 mut self,
79 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80 ) -> Result<Self, QueryError> {
81 self.query = map(self.query)?;
82 Ok(self)
83 }
84
85 #[must_use]
93 pub fn by_id(mut self, id: Id<E>) -> Self {
94 self.query = self.query.by_id(id.key());
95 self
96 }
97
98 #[must_use]
102 pub fn by_ids<I>(mut self, ids: I) -> Self
103 where
104 I: IntoIterator<Item = Id<E>>,
105 {
106 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107 self
108 }
109
110 #[must_use]
115 pub fn filter(self, predicate: Predicate) -> Self {
116 self.map_query(|query| query.filter(predicate))
117 }
118
119 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120 self.try_map_query(|query| query.filter_expr(expr))
121 }
122
123 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124 self.try_map_query(|query| query.sort_expr(expr))
125 }
126
127 #[must_use]
128 pub fn order_by(self, field: impl AsRef<str>) -> Self {
129 self.map_query(|query| query.order_by(field))
130 }
131
132 #[must_use]
133 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134 self.map_query(|query| query.order_by_desc(field))
135 }
136
137 #[must_use]
142 pub fn limit(self, limit: u32) -> Self {
143 self.map_query(|query| query.limit(limit))
144 }
145
146 #[must_use]
151 pub fn offset(self, offset: u32) -> Self {
152 self.map_query(|query| query.offset(offset))
153 }
154
155 #[must_use]
161 pub fn cursor(mut self, token: impl Into<String>) -> Self {
162 self.cursor_token = Some(token.into());
163 self
164 }
165
166 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171 self.query.explain()
172 }
173
174 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
175 if let Some(err) = self.cursor_intent_error() {
176 return Err(QueryError::Intent(err));
177 }
178
179 self.query.planned()
180 }
181
182 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
183 if let Some(err) = self.cursor_intent_error() {
184 return Err(QueryError::Intent(err));
185 }
186
187 self.query.plan()
188 }
189
190 pub fn execute(&self) -> Result<Response<E>, QueryError>
196 where
197 E: EntityValue,
198 {
199 self.ensure_non_paged_mode_ready()?;
200
201 self.session.execute_query(self.query())
202 }
203
204 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
215 self.ensure_paged_mode_ready()?;
216
217 Ok(PagedLoadQuery { inner: self })
218 }
219
220 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
224 where
225 E: EntityValue,
226 {
227 self.page()?.execute()
228 }
229
230 pub fn is_empty(&self) -> Result<bool, QueryError>
236 where
237 E: EntityValue,
238 {
239 Ok(!self.exists()?)
240 }
241
242 pub fn exists(&self) -> Result<bool, QueryError>
244 where
245 E: EntityValue,
246 {
247 self.ensure_non_paged_mode_ready()?;
248
249 self.session
250 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
251 }
252
253 pub fn count(&self) -> Result<u32, QueryError>
255 where
256 E: EntityValue,
257 {
258 self.ensure_non_paged_mode_ready()?;
259
260 self.session
261 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
262 }
263
264 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
266 where
267 E: EntityValue,
268 {
269 self.ensure_non_paged_mode_ready()?;
270
271 self.session
272 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
273 }
274
275 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
279 where
280 E: EntityValue,
281 {
282 self.ensure_non_paged_mode_ready()?;
283
284 self.session
285 .execute_load_query_with(self.query(), |load, plan| {
286 load.aggregate_min_by(plan, field.as_ref())
287 })
288 }
289
290 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
292 where
293 E: EntityValue,
294 {
295 self.ensure_non_paged_mode_ready()?;
296
297 self.session
298 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
299 }
300
301 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
305 where
306 E: EntityValue,
307 {
308 self.ensure_non_paged_mode_ready()?;
309
310 self.session
311 .execute_load_query_with(self.query(), |load, plan| {
312 load.aggregate_max_by(plan, field.as_ref())
313 })
314 }
315
316 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
319 where
320 E: EntityValue,
321 {
322 self.ensure_non_paged_mode_ready()?;
323
324 self.session
325 .execute_load_query_with(self.query(), |load, plan| {
326 load.aggregate_nth_by(plan, field.as_ref(), nth)
327 })
328 }
329
330 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
332 where
333 E: EntityValue,
334 {
335 self.ensure_non_paged_mode_ready()?;
336
337 self.session
338 .execute_load_query_with(self.query(), |load, plan| {
339 load.aggregate_sum_by(plan, field.as_ref())
340 })
341 }
342
343 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
345 where
346 E: EntityValue,
347 {
348 self.ensure_non_paged_mode_ready()?;
349
350 self.session
351 .execute_load_query_with(self.query(), |load, plan| {
352 load.aggregate_avg_by(plan, field.as_ref())
353 })
354 }
355
356 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
361 where
362 E: EntityValue,
363 {
364 self.ensure_non_paged_mode_ready()?;
365
366 self.session
367 .execute_load_query_with(self.query(), |load, plan| {
368 load.aggregate_median_by(plan, field.as_ref())
369 })
370 }
371
372 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
375 where
376 E: EntityValue,
377 {
378 self.ensure_non_paged_mode_ready()?;
379
380 self.session
381 .execute_load_query_with(self.query(), |load, plan| {
382 load.aggregate_count_distinct_by(plan, field.as_ref())
383 })
384 }
385
386 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
390 where
391 E: EntityValue,
392 {
393 self.ensure_non_paged_mode_ready()?;
394
395 self.session
396 .execute_load_query_with(self.query(), |load, plan| {
397 load.aggregate_min_max_by(plan, field.as_ref())
398 })
399 }
400
401 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
403 where
404 E: EntityValue,
405 {
406 self.ensure_non_paged_mode_ready()?;
407
408 self.session
409 .execute_load_query_with(self.query(), |load, plan| {
410 load.values_by(plan, field.as_ref())
411 })
412 }
413
414 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
416 where
417 E: EntityValue,
418 {
419 self.ensure_non_paged_mode_ready()?;
420
421 self.session
422 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
423 }
424
425 pub fn top_k_by(
433 &self,
434 field: impl AsRef<str>,
435 take_count: u32,
436 ) -> Result<Response<E>, QueryError>
437 where
438 E: EntityValue,
439 {
440 self.ensure_non_paged_mode_ready()?;
441
442 self.session
443 .execute_load_query_with(self.query(), |load, plan| {
444 load.top_k_by(plan, field.as_ref(), take_count)
445 })
446 }
447
448 pub fn bottom_k_by(
456 &self,
457 field: impl AsRef<str>,
458 take_count: u32,
459 ) -> Result<Response<E>, QueryError>
460 where
461 E: EntityValue,
462 {
463 self.ensure_non_paged_mode_ready()?;
464
465 self.session
466 .execute_load_query_with(self.query(), |load, plan| {
467 load.bottom_k_by(plan, field.as_ref(), take_count)
468 })
469 }
470
471 pub fn top_k_by_values(
479 &self,
480 field: impl AsRef<str>,
481 take_count: u32,
482 ) -> Result<Vec<Value>, QueryError>
483 where
484 E: EntityValue,
485 {
486 self.ensure_non_paged_mode_ready()?;
487
488 self.session
489 .execute_load_query_with(self.query(), |load, plan| {
490 load.top_k_by_values(plan, field.as_ref(), take_count)
491 })
492 }
493
494 pub fn bottom_k_by_values(
502 &self,
503 field: impl AsRef<str>,
504 take_count: u32,
505 ) -> Result<Vec<Value>, QueryError>
506 where
507 E: EntityValue,
508 {
509 self.ensure_non_paged_mode_ready()?;
510
511 self.session
512 .execute_load_query_with(self.query(), |load, plan| {
513 load.bottom_k_by_values(plan, field.as_ref(), take_count)
514 })
515 }
516
517 pub fn top_k_by_with_ids(
525 &self,
526 field: impl AsRef<str>,
527 take_count: u32,
528 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
529 where
530 E: EntityValue,
531 {
532 self.ensure_non_paged_mode_ready()?;
533
534 self.session
535 .execute_load_query_with(self.query(), |load, plan| {
536 load.top_k_by_with_ids(plan, field.as_ref(), take_count)
537 })
538 }
539
540 pub fn bottom_k_by_with_ids(
548 &self,
549 field: impl AsRef<str>,
550 take_count: u32,
551 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
552 where
553 E: EntityValue,
554 {
555 self.ensure_non_paged_mode_ready()?;
556
557 self.session
558 .execute_load_query_with(self.query(), |load, plan| {
559 load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
560 })
561 }
562
563 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
566 where
567 E: EntityValue,
568 {
569 self.ensure_non_paged_mode_ready()?;
570
571 self.session
572 .execute_load_query_with(self.query(), |load, plan| {
573 load.distinct_values_by(plan, field.as_ref())
574 })
575 }
576
577 pub fn values_by_with_ids(
580 &self,
581 field: impl AsRef<str>,
582 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
583 where
584 E: EntityValue,
585 {
586 self.ensure_non_paged_mode_ready()?;
587
588 self.session
589 .execute_load_query_with(self.query(), |load, plan| {
590 load.values_by_with_ids(plan, field.as_ref())
591 })
592 }
593
594 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
597 where
598 E: EntityValue,
599 {
600 self.ensure_non_paged_mode_ready()?;
601
602 self.session
603 .execute_load_query_with(self.query(), |load, plan| {
604 load.first_value_by(plan, field.as_ref())
605 })
606 }
607
608 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
611 where
612 E: EntityValue,
613 {
614 self.ensure_non_paged_mode_ready()?;
615
616 self.session
617 .execute_load_query_with(self.query(), |load, plan| {
618 load.last_value_by(plan, field.as_ref())
619 })
620 }
621
622 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
624 where
625 E: EntityValue,
626 {
627 self.ensure_non_paged_mode_ready()?;
628
629 self.session
630 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
631 }
632
633 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
635 where
636 E: EntityValue,
637 {
638 self.ensure_non_paged_mode_ready()?;
639
640 self.session
641 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
642 }
643
644 pub fn require_one(&self) -> Result<(), QueryError>
646 where
647 E: EntityValue,
648 {
649 self.execute()?.require_one()?;
650 Ok(())
651 }
652
653 pub fn require_some(&self) -> Result<(), QueryError>
655 where
656 E: EntityValue,
657 {
658 self.execute()?.require_some()?;
659 Ok(())
660 }
661}
662
663impl<E> FluentLoadQuery<'_, E>
664where
665 E: EntityKind,
666{
667 fn non_paged_intent_error(&self) -> Option<IntentError> {
668 self.cursor_token
669 .as_ref()
670 .map(|_| IntentError::CursorRequiresPagedExecution)
671 }
672
673 fn cursor_intent_error(&self) -> Option<IntentError> {
674 self.cursor_token
675 .as_ref()
676 .and_then(|_| self.paged_intent_error())
677 }
678
679 fn paged_intent_error(&self) -> Option<IntentError> {
680 let spec = self.query.load_spec()?;
681
682 policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
683 .err()
684 .map(IntentError::from)
685 }
686
687 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
688 if let Some(err) = self.paged_intent_error() {
689 return Err(QueryError::Intent(err));
690 }
691
692 Ok(())
693 }
694
695 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
696 if let Some(err) = self.non_paged_intent_error() {
697 return Err(QueryError::Intent(err));
698 }
699
700 Ok(())
701 }
702}
703
704impl<E> FluentLoadQuery<'_, E>
705where
706 E: EntityKind + SingletonEntity,
707 E::Key: Default,
708{
709 #[must_use]
710 pub fn only(self) -> Self {
711 self.map_query(Query::only)
712 }
713}
714
715impl<E> PagedLoadQuery<'_, E>
716where
717 E: EntityKind,
718{
719 #[must_use]
724 pub const fn query(&self) -> &Query<E> {
725 self.inner.query()
726 }
727
728 #[must_use]
734 pub fn cursor(mut self, token: impl Into<String>) -> Self {
735 self.inner = self.inner.cursor(token);
736 self
737 }
738
739 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
749 where
750 E: EntityValue,
751 {
752 self.execute_with_trace()
753 .map(PagedLoadExecutionWithTrace::into_execution)
754 }
755
756 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
762 where
763 E: EntityValue,
764 {
765 self.inner.ensure_paged_mode_ready()?;
766
767 self.inner.session.execute_load_query_paged_with_trace(
768 self.inner.query(),
769 self.inner.cursor_token.as_deref(),
770 )
771 }
772}