1use crate::{
7 db::{
8 DbSession, PagedGroupedExecutionWithTrace, PagedLoadExecution, PagedLoadExecutionWithTrace,
9 predicate::{CompareOp, Predicate},
10 query::{
11 builder::aggregate::AggregateExpr,
12 explain::ExplainPlan,
13 expr::{FilterExpr, SortExpr},
14 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
15 plan::{validate_fluent_non_paged_mode, validate_fluent_paged_mode},
16 },
17 response::Response,
18 },
19 traits::{EntityKind, EntityValue, SingletonEntity},
20 types::{Decimal, Id},
21 value::Value,
22};
23
24type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
25
26pub struct FluentLoadQuery<'a, E>
35where
36 E: EntityKind,
37{
38 session: &'a DbSession<E::Canister>,
39 query: Query<E>,
40 cursor_token: Option<String>,
41}
42
43impl<'a, E> FluentLoadQuery<'a, E>
44where
45 E: EntityKind,
46{
47 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
48 Self {
49 session,
50 query,
51 cursor_token: None,
52 }
53 }
54
55 #[must_use]
60 pub const fn query(&self) -> &Query<E> {
61 &self.query
62 }
63
64 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
65 self.query = map(self.query);
66 self
67 }
68
69 fn try_map_query(
70 mut self,
71 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
72 ) -> Result<Self, QueryError> {
73 self.query = map(self.query)?;
74 Ok(self)
75 }
76
77 #[must_use]
85 pub fn by_id(mut self, id: Id<E>) -> Self {
86 self.query = self.query.by_id(id.key());
87 self
88 }
89
90 #[must_use]
94 pub fn by_ids<I>(mut self, ids: I) -> Self
95 where
96 I: IntoIterator<Item = Id<E>>,
97 {
98 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
99 self
100 }
101
102 #[must_use]
107 pub fn filter(self, predicate: Predicate) -> Self {
108 self.map_query(|query| query.filter(predicate))
109 }
110
111 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
112 self.try_map_query(|query| query.filter_expr(expr))
113 }
114
115 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
116 self.try_map_query(|query| query.sort_expr(expr))
117 }
118
119 #[must_use]
120 pub fn order_by(self, field: impl AsRef<str>) -> Self {
121 self.map_query(|query| query.order_by(field))
122 }
123
124 #[must_use]
125 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
126 self.map_query(|query| query.order_by_desc(field))
127 }
128
129 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
131 let field = field.as_ref().to_owned();
132 self.try_map_query(|query| query.group_by(&field))
133 }
134
135 #[must_use]
137 pub fn aggregate(self, aggregate: AggregateExpr) -> Self {
138 self.map_query(|query| query.aggregate(aggregate))
139 }
140
141 #[must_use]
143 pub fn grouped_limits(self, max_groups: u64, max_group_bytes: u64) -> Self {
144 self.map_query(|query| query.grouped_limits(max_groups, max_group_bytes))
145 }
146
147 pub fn having_group(
149 self,
150 field: impl AsRef<str>,
151 op: CompareOp,
152 value: Value,
153 ) -> Result<Self, QueryError> {
154 let field = field.as_ref().to_owned();
155 self.try_map_query(|query| query.having_group(&field, op, value))
156 }
157
158 pub fn having_aggregate(
160 self,
161 aggregate_index: usize,
162 op: CompareOp,
163 value: Value,
164 ) -> Result<Self, QueryError> {
165 self.try_map_query(|query| query.having_aggregate(aggregate_index, op, value))
166 }
167
168 #[must_use]
174 pub fn limit(self, limit: u32) -> Self {
175 self.map_query(|query| query.limit(limit))
176 }
177
178 #[must_use]
184 pub fn offset(self, offset: u32) -> Self {
185 self.map_query(|query| query.offset(offset))
186 }
187
188 #[must_use]
194 pub fn cursor(mut self, token: impl Into<String>) -> Self {
195 self.cursor_token = Some(token.into());
196 self
197 }
198
199 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
204 self.query.explain()
205 }
206
207 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
208 if let Some(err) = self.cursor_intent_error() {
209 return Err(QueryError::Intent(err));
210 }
211
212 self.query.planned()
213 }
214
215 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
216 if let Some(err) = self.cursor_intent_error() {
217 return Err(QueryError::Intent(err));
218 }
219
220 self.query.plan()
221 }
222
223 pub fn execute(&self) -> Result<Response<E>, QueryError>
229 where
230 E: EntityValue,
231 {
232 self.ensure_non_paged_mode_ready()?;
233
234 self.session.execute_query(self.query())
235 }
236
237 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
248 self.ensure_paged_mode_ready()?;
249
250 Ok(PagedLoadQuery { inner: self })
251 }
252
253 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
257 where
258 E: EntityValue,
259 {
260 self.page()?.execute()
261 }
262
263 pub fn execute_grouped(self) -> Result<PagedGroupedExecutionWithTrace, QueryError>
268 where
269 E: EntityValue,
270 {
271 self.session
272 .execute_grouped(self.query(), self.cursor_token.as_deref())
273 }
274
275 pub fn is_empty(&self) -> Result<bool, QueryError>
281 where
282 E: EntityValue,
283 {
284 Ok(!self.exists()?)
285 }
286
287 pub fn exists(&self) -> Result<bool, QueryError>
289 where
290 E: EntityValue,
291 {
292 self.ensure_non_paged_mode_ready()?;
293
294 self.session
295 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
296 }
297
298 pub fn count(&self) -> Result<u32, QueryError>
300 where
301 E: EntityValue,
302 {
303 self.ensure_non_paged_mode_ready()?;
304
305 self.session
306 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
307 }
308
309 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
311 where
312 E: EntityValue,
313 {
314 self.ensure_non_paged_mode_ready()?;
315
316 self.session
317 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
318 }
319
320 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
324 where
325 E: EntityValue,
326 {
327 self.ensure_non_paged_mode_ready()?;
328
329 self.session
330 .execute_load_query_with(self.query(), |load, plan| {
331 load.aggregate_min_by(plan, field.as_ref())
332 })
333 }
334
335 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
337 where
338 E: EntityValue,
339 {
340 self.ensure_non_paged_mode_ready()?;
341
342 self.session
343 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
344 }
345
346 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
350 where
351 E: EntityValue,
352 {
353 self.ensure_non_paged_mode_ready()?;
354
355 self.session
356 .execute_load_query_with(self.query(), |load, plan| {
357 load.aggregate_max_by(plan, field.as_ref())
358 })
359 }
360
361 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
364 where
365 E: EntityValue,
366 {
367 self.ensure_non_paged_mode_ready()?;
368
369 self.session
370 .execute_load_query_with(self.query(), |load, plan| {
371 load.aggregate_nth_by(plan, field.as_ref(), nth)
372 })
373 }
374
375 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
377 where
378 E: EntityValue,
379 {
380 self.ensure_non_paged_mode_ready()?;
381
382 self.session
383 .execute_load_query_with(self.query(), |load, plan| {
384 load.aggregate_sum_by(plan, field.as_ref())
385 })
386 }
387
388 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
390 where
391 E: EntityValue,
392 {
393 self.ensure_non_paged_mode_ready()?;
394
395 self.session
396 .execute_load_query_with(self.query(), |load, plan| {
397 load.aggregate_sum_distinct_by(plan, field.as_ref())
398 })
399 }
400
401 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
403 where
404 E: EntityValue,
405 {
406 self.ensure_non_paged_mode_ready()?;
407
408 self.session
409 .execute_load_query_with(self.query(), |load, plan| {
410 load.aggregate_avg_by(plan, field.as_ref())
411 })
412 }
413
414 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
419 where
420 E: EntityValue,
421 {
422 self.ensure_non_paged_mode_ready()?;
423
424 self.session
425 .execute_load_query_with(self.query(), |load, plan| {
426 load.aggregate_median_by(plan, field.as_ref())
427 })
428 }
429
430 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
433 where
434 E: EntityValue,
435 {
436 self.ensure_non_paged_mode_ready()?;
437
438 self.session
439 .execute_load_query_with(self.query(), |load, plan| {
440 load.aggregate_count_distinct_by(plan, field.as_ref())
441 })
442 }
443
444 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
448 where
449 E: EntityValue,
450 {
451 self.ensure_non_paged_mode_ready()?;
452
453 self.session
454 .execute_load_query_with(self.query(), |load, plan| {
455 load.aggregate_min_max_by(plan, field.as_ref())
456 })
457 }
458
459 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
461 where
462 E: EntityValue,
463 {
464 self.ensure_non_paged_mode_ready()?;
465
466 self.session
467 .execute_load_query_with(self.query(), |load, plan| {
468 load.values_by(plan, field.as_ref())
469 })
470 }
471
472 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
474 where
475 E: EntityValue,
476 {
477 self.ensure_non_paged_mode_ready()?;
478
479 self.session
480 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
481 }
482
483 pub fn top_k_by(
491 &self,
492 field: impl AsRef<str>,
493 take_count: u32,
494 ) -> Result<Response<E>, QueryError>
495 where
496 E: EntityValue,
497 {
498 self.ensure_non_paged_mode_ready()?;
499
500 self.session
501 .execute_load_query_with(self.query(), |load, plan| {
502 load.top_k_by(plan, field.as_ref(), take_count)
503 })
504 }
505
506 pub fn bottom_k_by(
514 &self,
515 field: impl AsRef<str>,
516 take_count: u32,
517 ) -> Result<Response<E>, QueryError>
518 where
519 E: EntityValue,
520 {
521 self.ensure_non_paged_mode_ready()?;
522
523 self.session
524 .execute_load_query_with(self.query(), |load, plan| {
525 load.bottom_k_by(plan, field.as_ref(), take_count)
526 })
527 }
528
529 pub fn top_k_by_values(
537 &self,
538 field: impl AsRef<str>,
539 take_count: u32,
540 ) -> Result<Vec<Value>, QueryError>
541 where
542 E: EntityValue,
543 {
544 self.ensure_non_paged_mode_ready()?;
545
546 self.session
547 .execute_load_query_with(self.query(), |load, plan| {
548 load.top_k_by_values(plan, field.as_ref(), take_count)
549 })
550 }
551
552 pub fn bottom_k_by_values(
560 &self,
561 field: impl AsRef<str>,
562 take_count: u32,
563 ) -> Result<Vec<Value>, QueryError>
564 where
565 E: EntityValue,
566 {
567 self.ensure_non_paged_mode_ready()?;
568
569 self.session
570 .execute_load_query_with(self.query(), |load, plan| {
571 load.bottom_k_by_values(plan, field.as_ref(), take_count)
572 })
573 }
574
575 pub fn top_k_by_with_ids(
583 &self,
584 field: impl AsRef<str>,
585 take_count: u32,
586 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
587 where
588 E: EntityValue,
589 {
590 self.ensure_non_paged_mode_ready()?;
591
592 self.session
593 .execute_load_query_with(self.query(), |load, plan| {
594 load.top_k_by_with_ids(plan, field.as_ref(), take_count)
595 })
596 }
597
598 pub fn bottom_k_by_with_ids(
606 &self,
607 field: impl AsRef<str>,
608 take_count: u32,
609 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
610 where
611 E: EntityValue,
612 {
613 self.ensure_non_paged_mode_ready()?;
614
615 self.session
616 .execute_load_query_with(self.query(), |load, plan| {
617 load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
618 })
619 }
620
621 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
624 where
625 E: EntityValue,
626 {
627 self.ensure_non_paged_mode_ready()?;
628
629 self.session
630 .execute_load_query_with(self.query(), |load, plan| {
631 load.distinct_values_by(plan, field.as_ref())
632 })
633 }
634
635 pub fn values_by_with_ids(
638 &self,
639 field: impl AsRef<str>,
640 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
641 where
642 E: EntityValue,
643 {
644 self.ensure_non_paged_mode_ready()?;
645
646 self.session
647 .execute_load_query_with(self.query(), |load, plan| {
648 load.values_by_with_ids(plan, field.as_ref())
649 })
650 }
651
652 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
655 where
656 E: EntityValue,
657 {
658 self.ensure_non_paged_mode_ready()?;
659
660 self.session
661 .execute_load_query_with(self.query(), |load, plan| {
662 load.first_value_by(plan, field.as_ref())
663 })
664 }
665
666 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
669 where
670 E: EntityValue,
671 {
672 self.ensure_non_paged_mode_ready()?;
673
674 self.session
675 .execute_load_query_with(self.query(), |load, plan| {
676 load.last_value_by(plan, field.as_ref())
677 })
678 }
679
680 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
682 where
683 E: EntityValue,
684 {
685 self.ensure_non_paged_mode_ready()?;
686
687 self.session
688 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
689 }
690
691 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
693 where
694 E: EntityValue,
695 {
696 self.ensure_non_paged_mode_ready()?;
697
698 self.session
699 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
700 }
701
702 pub fn require_one(&self) -> Result<(), QueryError>
704 where
705 E: EntityValue,
706 {
707 self.execute()?.require_one()?;
708 Ok(())
709 }
710
711 pub fn require_some(&self) -> Result<(), QueryError>
713 where
714 E: EntityValue,
715 {
716 self.execute()?.require_some()?;
717 Ok(())
718 }
719}
720
721impl<E> FluentLoadQuery<'_, E>
722where
723 E: EntityKind,
724{
725 fn non_paged_intent_error(&self) -> Option<IntentError> {
726 validate_fluent_non_paged_mode(self.cursor_token.is_some(), self.query.has_grouping())
727 .err()
728 .map(IntentError::from)
729 }
730
731 fn cursor_intent_error(&self) -> Option<IntentError> {
732 self.cursor_token
733 .as_ref()
734 .and_then(|_| self.paged_intent_error())
735 }
736
737 fn paged_intent_error(&self) -> Option<IntentError> {
738 validate_fluent_paged_mode(
739 self.query.has_grouping(),
740 self.query.has_explicit_order(),
741 self.query.load_spec(),
742 )
743 .err()
744 .map(IntentError::from)
745 }
746
747 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
748 if let Some(err) = self.paged_intent_error() {
749 return Err(QueryError::Intent(err));
750 }
751
752 Ok(())
753 }
754
755 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
756 if let Some(err) = self.non_paged_intent_error() {
757 return Err(QueryError::Intent(err));
758 }
759
760 Ok(())
761 }
762}
763
764impl<E> FluentLoadQuery<'_, E>
765where
766 E: EntityKind + SingletonEntity,
767 E::Key: Default,
768{
769 #[must_use]
770 pub fn only(self) -> Self {
771 self.map_query(Query::only)
772 }
773}
774
775pub struct PagedLoadQuery<'a, E>
783where
784 E: EntityKind,
785{
786 inner: FluentLoadQuery<'a, E>,
787}
788
789impl<E> PagedLoadQuery<'_, E>
790where
791 E: EntityKind,
792{
793 #[must_use]
798 pub const fn query(&self) -> &Query<E> {
799 self.inner.query()
800 }
801
802 #[must_use]
808 pub fn cursor(mut self, token: impl Into<String>) -> Self {
809 self.inner = self.inner.cursor(token);
810 self
811 }
812
813 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
823 where
824 E: EntityValue,
825 {
826 self.execute_with_trace()
827 .map(PagedLoadExecutionWithTrace::into_execution)
828 }
829
830 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
836 where
837 E: EntityValue,
838 {
839 self.inner.ensure_paged_mode_ready()?;
840
841 self.inner.session.execute_load_query_paged_with_trace(
842 self.inner.query(),
843 self.inner.cursor_token.as_deref(),
844 )
845 }
846}