1use crate::{
7 db::{
8 DbSession, PagedGroupedExecutionWithTrace, PagedLoadExecution, PagedLoadExecutionWithTrace,
9 predicate::Predicate,
10 query::{
11 explain::ExplainPlan,
12 expr::{FilterExpr, SortExpr},
13 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
14 plan::{validate_fluent_non_paged_mode, validate_fluent_paged_mode},
15 },
16 response::Response,
17 },
18 traits::{EntityKind, EntityValue, SingletonEntity},
19 types::{Decimal, Id},
20 value::Value,
21};
22
23type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
24
25pub struct FluentLoadQuery<'a, E>
34where
35 E: EntityKind,
36{
37 session: &'a DbSession<E::Canister>,
38 query: Query<E>,
39 cursor_token: Option<String>,
40}
41
42impl<'a, E> FluentLoadQuery<'a, E>
43where
44 E: EntityKind,
45{
46 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
47 Self {
48 session,
49 query,
50 cursor_token: None,
51 }
52 }
53
54 #[must_use]
59 pub const fn query(&self) -> &Query<E> {
60 &self.query
61 }
62
63 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
64 self.query = map(self.query);
65 self
66 }
67
68 fn try_map_query(
69 mut self,
70 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
71 ) -> Result<Self, QueryError> {
72 self.query = map(self.query)?;
73 Ok(self)
74 }
75
76 #[must_use]
84 pub fn by_id(mut self, id: Id<E>) -> Self {
85 self.query = self.query.by_id(id.key());
86 self
87 }
88
89 #[must_use]
93 pub fn by_ids<I>(mut self, ids: I) -> Self
94 where
95 I: IntoIterator<Item = Id<E>>,
96 {
97 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
98 self
99 }
100
101 #[must_use]
106 pub fn filter(self, predicate: Predicate) -> Self {
107 self.map_query(|query| query.filter(predicate))
108 }
109
110 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
111 self.try_map_query(|query| query.filter_expr(expr))
112 }
113
114 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
115 self.try_map_query(|query| query.sort_expr(expr))
116 }
117
118 #[must_use]
119 pub fn order_by(self, field: impl AsRef<str>) -> Self {
120 self.map_query(|query| query.order_by(field))
121 }
122
123 #[must_use]
124 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
125 self.map_query(|query| query.order_by_desc(field))
126 }
127
128 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
130 let field = field.as_ref().to_owned();
131 self.try_map_query(|query| query.group_by(&field))
132 }
133
134 #[must_use]
136 pub fn group_count(self) -> Self {
137 self.map_query(Query::group_count)
138 }
139
140 #[must_use]
142 pub fn group_exists(self) -> Self {
143 self.map_query(Query::group_exists)
144 }
145
146 #[must_use]
148 pub fn group_first(self) -> Self {
149 self.map_query(Query::group_first)
150 }
151
152 #[must_use]
154 pub fn group_last(self) -> Self {
155 self.map_query(Query::group_last)
156 }
157
158 #[must_use]
160 pub fn group_min(self) -> Self {
161 self.map_query(Query::group_min)
162 }
163
164 #[must_use]
166 pub fn group_max(self) -> Self {
167 self.map_query(Query::group_max)
168 }
169
170 pub fn group_min_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
172 let field = field.as_ref().to_owned();
173 self.try_map_query(|query| query.group_min_by(&field))
174 }
175
176 pub fn group_max_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
178 let field = field.as_ref().to_owned();
179 self.try_map_query(|query| query.group_max_by(&field))
180 }
181
182 #[must_use]
184 pub fn grouped_limits(self, max_groups: u64, max_group_bytes: u64) -> Self {
185 self.map_query(|query| query.grouped_limits(max_groups, max_group_bytes))
186 }
187
188 #[must_use]
194 pub fn limit(self, limit: u32) -> Self {
195 self.map_query(|query| query.limit(limit))
196 }
197
198 #[must_use]
204 pub fn offset(self, offset: u32) -> Self {
205 self.map_query(|query| query.offset(offset))
206 }
207
208 #[must_use]
214 pub fn cursor(mut self, token: impl Into<String>) -> Self {
215 self.cursor_token = Some(token.into());
216 self
217 }
218
219 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
224 self.query.explain()
225 }
226
227 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
228 if let Some(err) = self.cursor_intent_error() {
229 return Err(QueryError::Intent(err));
230 }
231
232 self.query.planned()
233 }
234
235 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
236 if let Some(err) = self.cursor_intent_error() {
237 return Err(QueryError::Intent(err));
238 }
239
240 self.query.plan()
241 }
242
243 pub fn execute(&self) -> Result<Response<E>, QueryError>
249 where
250 E: EntityValue,
251 {
252 self.ensure_non_paged_mode_ready()?;
253
254 self.session.execute_query(self.query())
255 }
256
257 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
268 self.ensure_paged_mode_ready()?;
269
270 Ok(PagedLoadQuery { inner: self })
271 }
272
273 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
277 where
278 E: EntityValue,
279 {
280 self.page()?.execute()
281 }
282
283 pub fn execute_grouped(self) -> Result<PagedGroupedExecutionWithTrace, QueryError>
288 where
289 E: EntityValue,
290 {
291 self.session
292 .execute_grouped(self.query(), self.cursor_token.as_deref())
293 }
294
295 pub fn is_empty(&self) -> Result<bool, QueryError>
301 where
302 E: EntityValue,
303 {
304 Ok(!self.exists()?)
305 }
306
307 pub fn exists(&self) -> Result<bool, QueryError>
309 where
310 E: EntityValue,
311 {
312 self.ensure_non_paged_mode_ready()?;
313
314 self.session
315 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
316 }
317
318 pub fn count(&self) -> Result<u32, QueryError>
320 where
321 E: EntityValue,
322 {
323 self.ensure_non_paged_mode_ready()?;
324
325 self.session
326 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
327 }
328
329 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
331 where
332 E: EntityValue,
333 {
334 self.ensure_non_paged_mode_ready()?;
335
336 self.session
337 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
338 }
339
340 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
344 where
345 E: EntityValue,
346 {
347 self.ensure_non_paged_mode_ready()?;
348
349 self.session
350 .execute_load_query_with(self.query(), |load, plan| {
351 load.aggregate_min_by(plan, field.as_ref())
352 })
353 }
354
355 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
357 where
358 E: EntityValue,
359 {
360 self.ensure_non_paged_mode_ready()?;
361
362 self.session
363 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
364 }
365
366 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
370 where
371 E: EntityValue,
372 {
373 self.ensure_non_paged_mode_ready()?;
374
375 self.session
376 .execute_load_query_with(self.query(), |load, plan| {
377 load.aggregate_max_by(plan, field.as_ref())
378 })
379 }
380
381 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
384 where
385 E: EntityValue,
386 {
387 self.ensure_non_paged_mode_ready()?;
388
389 self.session
390 .execute_load_query_with(self.query(), |load, plan| {
391 load.aggregate_nth_by(plan, field.as_ref(), nth)
392 })
393 }
394
395 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
397 where
398 E: EntityValue,
399 {
400 self.ensure_non_paged_mode_ready()?;
401
402 self.session
403 .execute_load_query_with(self.query(), |load, plan| {
404 load.aggregate_sum_by(plan, field.as_ref())
405 })
406 }
407
408 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
410 where
411 E: EntityValue,
412 {
413 self.ensure_non_paged_mode_ready()?;
414
415 self.session
416 .execute_load_query_with(self.query(), |load, plan| {
417 load.aggregate_avg_by(plan, field.as_ref())
418 })
419 }
420
421 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
426 where
427 E: EntityValue,
428 {
429 self.ensure_non_paged_mode_ready()?;
430
431 self.session
432 .execute_load_query_with(self.query(), |load, plan| {
433 load.aggregate_median_by(plan, field.as_ref())
434 })
435 }
436
437 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
440 where
441 E: EntityValue,
442 {
443 self.ensure_non_paged_mode_ready()?;
444
445 self.session
446 .execute_load_query_with(self.query(), |load, plan| {
447 load.aggregate_count_distinct_by(plan, field.as_ref())
448 })
449 }
450
451 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
455 where
456 E: EntityValue,
457 {
458 self.ensure_non_paged_mode_ready()?;
459
460 self.session
461 .execute_load_query_with(self.query(), |load, plan| {
462 load.aggregate_min_max_by(plan, field.as_ref())
463 })
464 }
465
466 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
468 where
469 E: EntityValue,
470 {
471 self.ensure_non_paged_mode_ready()?;
472
473 self.session
474 .execute_load_query_with(self.query(), |load, plan| {
475 load.values_by(plan, field.as_ref())
476 })
477 }
478
479 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
481 where
482 E: EntityValue,
483 {
484 self.ensure_non_paged_mode_ready()?;
485
486 self.session
487 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
488 }
489
490 pub fn top_k_by(
498 &self,
499 field: impl AsRef<str>,
500 take_count: u32,
501 ) -> Result<Response<E>, QueryError>
502 where
503 E: EntityValue,
504 {
505 self.ensure_non_paged_mode_ready()?;
506
507 self.session
508 .execute_load_query_with(self.query(), |load, plan| {
509 load.top_k_by(plan, field.as_ref(), take_count)
510 })
511 }
512
513 pub fn bottom_k_by(
521 &self,
522 field: impl AsRef<str>,
523 take_count: u32,
524 ) -> Result<Response<E>, QueryError>
525 where
526 E: EntityValue,
527 {
528 self.ensure_non_paged_mode_ready()?;
529
530 self.session
531 .execute_load_query_with(self.query(), |load, plan| {
532 load.bottom_k_by(plan, field.as_ref(), take_count)
533 })
534 }
535
536 pub fn top_k_by_values(
544 &self,
545 field: impl AsRef<str>,
546 take_count: u32,
547 ) -> Result<Vec<Value>, QueryError>
548 where
549 E: EntityValue,
550 {
551 self.ensure_non_paged_mode_ready()?;
552
553 self.session
554 .execute_load_query_with(self.query(), |load, plan| {
555 load.top_k_by_values(plan, field.as_ref(), take_count)
556 })
557 }
558
559 pub fn bottom_k_by_values(
567 &self,
568 field: impl AsRef<str>,
569 take_count: u32,
570 ) -> Result<Vec<Value>, QueryError>
571 where
572 E: EntityValue,
573 {
574 self.ensure_non_paged_mode_ready()?;
575
576 self.session
577 .execute_load_query_with(self.query(), |load, plan| {
578 load.bottom_k_by_values(plan, field.as_ref(), take_count)
579 })
580 }
581
582 pub fn top_k_by_with_ids(
590 &self,
591 field: impl AsRef<str>,
592 take_count: u32,
593 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
594 where
595 E: EntityValue,
596 {
597 self.ensure_non_paged_mode_ready()?;
598
599 self.session
600 .execute_load_query_with(self.query(), |load, plan| {
601 load.top_k_by_with_ids(plan, field.as_ref(), take_count)
602 })
603 }
604
605 pub fn bottom_k_by_with_ids(
613 &self,
614 field: impl AsRef<str>,
615 take_count: u32,
616 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
617 where
618 E: EntityValue,
619 {
620 self.ensure_non_paged_mode_ready()?;
621
622 self.session
623 .execute_load_query_with(self.query(), |load, plan| {
624 load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
625 })
626 }
627
628 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
631 where
632 E: EntityValue,
633 {
634 self.ensure_non_paged_mode_ready()?;
635
636 self.session
637 .execute_load_query_with(self.query(), |load, plan| {
638 load.distinct_values_by(plan, field.as_ref())
639 })
640 }
641
642 pub fn values_by_with_ids(
645 &self,
646 field: impl AsRef<str>,
647 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
648 where
649 E: EntityValue,
650 {
651 self.ensure_non_paged_mode_ready()?;
652
653 self.session
654 .execute_load_query_with(self.query(), |load, plan| {
655 load.values_by_with_ids(plan, field.as_ref())
656 })
657 }
658
659 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
662 where
663 E: EntityValue,
664 {
665 self.ensure_non_paged_mode_ready()?;
666
667 self.session
668 .execute_load_query_with(self.query(), |load, plan| {
669 load.first_value_by(plan, field.as_ref())
670 })
671 }
672
673 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
676 where
677 E: EntityValue,
678 {
679 self.ensure_non_paged_mode_ready()?;
680
681 self.session
682 .execute_load_query_with(self.query(), |load, plan| {
683 load.last_value_by(plan, field.as_ref())
684 })
685 }
686
687 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
689 where
690 E: EntityValue,
691 {
692 self.ensure_non_paged_mode_ready()?;
693
694 self.session
695 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
696 }
697
698 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
700 where
701 E: EntityValue,
702 {
703 self.ensure_non_paged_mode_ready()?;
704
705 self.session
706 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
707 }
708
709 pub fn require_one(&self) -> Result<(), QueryError>
711 where
712 E: EntityValue,
713 {
714 self.execute()?.require_one()?;
715 Ok(())
716 }
717
718 pub fn require_some(&self) -> Result<(), QueryError>
720 where
721 E: EntityValue,
722 {
723 self.execute()?.require_some()?;
724 Ok(())
725 }
726}
727
728impl<E> FluentLoadQuery<'_, E>
729where
730 E: EntityKind,
731{
732 fn non_paged_intent_error(&self) -> Option<IntentError> {
733 validate_fluent_non_paged_mode(self.cursor_token.is_some(), self.query.has_grouping())
734 .err()
735 .map(IntentError::from)
736 }
737
738 fn cursor_intent_error(&self) -> Option<IntentError> {
739 self.cursor_token
740 .as_ref()
741 .and_then(|_| self.paged_intent_error())
742 }
743
744 fn paged_intent_error(&self) -> Option<IntentError> {
745 validate_fluent_paged_mode(
746 self.query.has_grouping(),
747 self.query.has_explicit_order(),
748 self.query.load_spec(),
749 )
750 .err()
751 .map(IntentError::from)
752 }
753
754 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
755 if let Some(err) = self.paged_intent_error() {
756 return Err(QueryError::Intent(err));
757 }
758
759 Ok(())
760 }
761
762 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
763 if let Some(err) = self.non_paged_intent_error() {
764 return Err(QueryError::Intent(err));
765 }
766
767 Ok(())
768 }
769}
770
771impl<E> FluentLoadQuery<'_, E>
772where
773 E: EntityKind + SingletonEntity,
774 E::Key: Default,
775{
776 #[must_use]
777 pub fn only(self) -> Self {
778 self.map_query(Query::only)
779 }
780}
781
782pub struct PagedLoadQuery<'a, E>
790where
791 E: EntityKind,
792{
793 inner: FluentLoadQuery<'a, E>,
794}
795
796impl<E> PagedLoadQuery<'_, E>
797where
798 E: EntityKind,
799{
800 #[must_use]
805 pub const fn query(&self) -> &Query<E> {
806 self.inner.query()
807 }
808
809 #[must_use]
815 pub fn cursor(mut self, token: impl Into<String>) -> Self {
816 self.inner = self.inner.cursor(token);
817 self
818 }
819
820 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
830 where
831 E: EntityValue,
832 {
833 self.execute_with_trace()
834 .map(PagedLoadExecutionWithTrace::into_execution)
835 }
836
837 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
843 where
844 E: EntityValue,
845 {
846 self.inner.ensure_paged_mode_ready()?;
847
848 self.inner.session.execute_load_query_paged_with_trace(
849 self.inner.query(),
850 self.inner.cursor_token.as_deref(),
851 )
852 }
853}