1use crate::{
7 db::{
8 DbSession, PagedGroupedExecutionWithTrace, PagedLoadExecution, PagedLoadExecutionWithTrace,
9 predicate::{CompareOp, Predicate},
10 query::{
11 api::ResponseCardinalityExt,
12 builder::aggregate::AggregateExpr,
13 explain::ExplainPlan,
14 expr::{FilterExpr, SortExpr},
15 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
16 plan::{FieldSlot, validate_fluent_non_paged_mode, validate_fluent_paged_mode},
17 },
18 response::EntityResponse,
19 },
20 error::InternalError,
21 traits::{EntityKind, EntityValue, SingletonEntity},
22 types::{Decimal, Id},
23 value::Value,
24};
25
26type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
27
28pub struct FluentLoadQuery<'a, E>
37where
38 E: EntityKind,
39{
40 session: &'a DbSession<E::Canister>,
41 query: Query<E>,
42 cursor_token: Option<String>,
43}
44
45impl<'a, E> FluentLoadQuery<'a, E>
46where
47 E: EntityKind,
48{
49 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
50 Self {
51 session,
52 query,
53 cursor_token: None,
54 }
55 }
56
57 #[must_use]
62 pub const fn query(&self) -> &Query<E> {
63 &self.query
64 }
65
66 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
67 self.query = map(self.query);
68 self
69 }
70
71 fn try_map_query(
72 mut self,
73 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
74 ) -> Result<Self, QueryError> {
75 self.query = map(self.query)?;
76 Ok(self)
77 }
78
79 #[must_use]
87 pub fn by_id(mut self, id: Id<E>) -> Self {
88 self.query = self.query.by_id(id.key());
89 self
90 }
91
92 #[must_use]
96 pub fn by_ids<I>(mut self, ids: I) -> Self
97 where
98 I: IntoIterator<Item = Id<E>>,
99 {
100 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
101 self
102 }
103
104 #[must_use]
109 pub fn filter(self, predicate: Predicate) -> Self {
110 self.map_query(|query| query.filter(predicate))
111 }
112
113 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
114 self.try_map_query(|query| query.filter_expr(expr))
115 }
116
117 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
118 self.try_map_query(|query| query.sort_expr(expr))
119 }
120
121 #[must_use]
122 pub fn order_by(self, field: impl AsRef<str>) -> Self {
123 self.map_query(|query| query.order_by(field))
124 }
125
126 #[must_use]
127 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
128 self.map_query(|query| query.order_by_desc(field))
129 }
130
131 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
133 let field = field.as_ref().to_owned();
134 self.try_map_query(|query| query.group_by(&field))
135 }
136
137 #[must_use]
139 pub fn aggregate(self, aggregate: AggregateExpr) -> Self {
140 self.map_query(|query| query.aggregate(aggregate))
141 }
142
143 #[must_use]
145 pub fn grouped_limits(self, max_groups: u64, max_group_bytes: u64) -> Self {
146 self.map_query(|query| query.grouped_limits(max_groups, max_group_bytes))
147 }
148
149 pub fn having_group(
151 self,
152 field: impl AsRef<str>,
153 op: CompareOp,
154 value: Value,
155 ) -> Result<Self, QueryError> {
156 let field = field.as_ref().to_owned();
157 self.try_map_query(|query| query.having_group(&field, op, value))
158 }
159
160 pub fn having_aggregate(
162 self,
163 aggregate_index: usize,
164 op: CompareOp,
165 value: Value,
166 ) -> Result<Self, QueryError> {
167 self.try_map_query(|query| query.having_aggregate(aggregate_index, op, value))
168 }
169
170 #[must_use]
176 pub fn limit(self, limit: u32) -> Self {
177 self.map_query(|query| query.limit(limit))
178 }
179
180 #[must_use]
186 pub fn offset(self, offset: u32) -> Self {
187 self.map_query(|query| query.offset(offset))
188 }
189
190 #[must_use]
196 pub fn cursor(mut self, token: impl Into<String>) -> Self {
197 self.cursor_token = Some(token.into());
198 self
199 }
200
201 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
206 self.query.explain()
207 }
208
209 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
210 if let Some(err) = self.cursor_intent_error() {
211 return Err(QueryError::Intent(err));
212 }
213
214 self.query.planned()
215 }
216
217 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
218 if let Some(err) = self.cursor_intent_error() {
219 return Err(QueryError::Intent(err));
220 }
221
222 self.query.plan()
223 }
224
225 pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
231 where
232 E: EntityValue,
233 {
234 self.ensure_non_paged_mode_ready()?;
235
236 self.session.execute_query(self.query())
237 }
238
239 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
250 self.ensure_paged_mode_ready()?;
251
252 Ok(PagedLoadQuery { inner: self })
253 }
254
255 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
259 where
260 E: EntityValue,
261 {
262 self.page()?.execute()
263 }
264
265 pub fn execute_grouped(self) -> Result<PagedGroupedExecutionWithTrace, QueryError>
270 where
271 E: EntityValue,
272 {
273 self.session
274 .execute_grouped(self.query(), self.cursor_token.as_deref())
275 }
276
277 pub fn is_empty(&self) -> Result<bool, QueryError>
283 where
284 E: EntityValue,
285 {
286 Ok(!self.exists()?)
287 }
288
289 pub fn exists(&self) -> Result<bool, QueryError>
291 where
292 E: EntityValue,
293 {
294 self.ensure_non_paged_mode_ready()?;
295
296 self.session
297 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
298 }
299
300 pub fn count(&self) -> Result<u32, QueryError>
302 where
303 E: EntityValue,
304 {
305 self.ensure_non_paged_mode_ready()?;
306
307 self.session
308 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
309 }
310
311 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
313 where
314 E: EntityValue,
315 {
316 self.ensure_non_paged_mode_ready()?;
317
318 self.session
319 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
320 }
321
322 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
326 where
327 E: EntityValue,
328 {
329 self.ensure_non_paged_mode_ready()?;
330 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
331
332 self.session
333 .execute_load_query_with(self.query(), move |load, plan| {
334 load.aggregate_min_by_slot(plan, target_slot)
335 })
336 }
337
338 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
340 where
341 E: EntityValue,
342 {
343 self.ensure_non_paged_mode_ready()?;
344
345 self.session
346 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
347 }
348
349 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
353 where
354 E: EntityValue,
355 {
356 self.ensure_non_paged_mode_ready()?;
357 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
358
359 self.session
360 .execute_load_query_with(self.query(), move |load, plan| {
361 load.aggregate_max_by_slot(plan, target_slot)
362 })
363 }
364
365 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
368 where
369 E: EntityValue,
370 {
371 self.ensure_non_paged_mode_ready()?;
372 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
373
374 self.session
375 .execute_load_query_with(self.query(), move |load, plan| {
376 load.aggregate_nth_by_slot(plan, target_slot, nth)
377 })
378 }
379
380 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
382 where
383 E: EntityValue,
384 {
385 self.ensure_non_paged_mode_ready()?;
386 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
387
388 self.session
389 .execute_load_query_with(self.query(), move |load, plan| {
390 load.aggregate_sum_by_slot(plan, target_slot)
391 })
392 }
393
394 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
396 where
397 E: EntityValue,
398 {
399 self.ensure_non_paged_mode_ready()?;
400 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
401
402 self.session
403 .execute_load_query_with(self.query(), move |load, plan| {
404 load.aggregate_sum_distinct_by_slot(plan, target_slot)
405 })
406 }
407
408 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
410 where
411 E: EntityValue,
412 {
413 self.ensure_non_paged_mode_ready()?;
414 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
415
416 self.session
417 .execute_load_query_with(self.query(), move |load, plan| {
418 load.aggregate_avg_by_slot(plan, target_slot)
419 })
420 }
421
422 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
427 where
428 E: EntityValue,
429 {
430 self.ensure_non_paged_mode_ready()?;
431 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
432
433 self.session
434 .execute_load_query_with(self.query(), move |load, plan| {
435 load.aggregate_median_by_slot(plan, target_slot)
436 })
437 }
438
439 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
442 where
443 E: EntityValue,
444 {
445 self.ensure_non_paged_mode_ready()?;
446 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
447
448 self.session
449 .execute_load_query_with(self.query(), move |load, plan| {
450 load.aggregate_count_distinct_by_slot(plan, target_slot)
451 })
452 }
453
454 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
458 where
459 E: EntityValue,
460 {
461 self.ensure_non_paged_mode_ready()?;
462 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
463
464 self.session
465 .execute_load_query_with(self.query(), move |load, plan| {
466 load.aggregate_min_max_by_slot(plan, target_slot)
467 })
468 }
469
470 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
472 where
473 E: EntityValue,
474 {
475 self.ensure_non_paged_mode_ready()?;
476 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
477
478 self.session
479 .execute_load_query_with(self.query(), move |load, plan| {
480 load.values_by_slot(plan, target_slot)
481 })
482 }
483
484 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
486 where
487 E: EntityValue,
488 {
489 self.ensure_non_paged_mode_ready()?;
490
491 self.session
492 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
493 }
494
495 pub fn top_k_by(
503 &self,
504 field: impl AsRef<str>,
505 take_count: u32,
506 ) -> Result<EntityResponse<E>, QueryError>
507 where
508 E: EntityValue,
509 {
510 self.ensure_non_paged_mode_ready()?;
511 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
512
513 self.session
514 .execute_load_query_with(self.query(), move |load, plan| {
515 load.top_k_by_slot(plan, target_slot, take_count)
516 })
517 }
518
519 pub fn bottom_k_by(
527 &self,
528 field: impl AsRef<str>,
529 take_count: u32,
530 ) -> Result<EntityResponse<E>, QueryError>
531 where
532 E: EntityValue,
533 {
534 self.ensure_non_paged_mode_ready()?;
535 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
536
537 self.session
538 .execute_load_query_with(self.query(), move |load, plan| {
539 load.bottom_k_by_slot(plan, target_slot, take_count)
540 })
541 }
542
543 pub fn top_k_by_values(
551 &self,
552 field: impl AsRef<str>,
553 take_count: u32,
554 ) -> Result<Vec<Value>, QueryError>
555 where
556 E: EntityValue,
557 {
558 self.ensure_non_paged_mode_ready()?;
559 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
560
561 self.session
562 .execute_load_query_with(self.query(), move |load, plan| {
563 load.top_k_by_values_slot(plan, target_slot, take_count)
564 })
565 }
566
567 pub fn bottom_k_by_values(
575 &self,
576 field: impl AsRef<str>,
577 take_count: u32,
578 ) -> Result<Vec<Value>, QueryError>
579 where
580 E: EntityValue,
581 {
582 self.ensure_non_paged_mode_ready()?;
583 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
584
585 self.session
586 .execute_load_query_with(self.query(), move |load, plan| {
587 load.bottom_k_by_values_slot(plan, target_slot, take_count)
588 })
589 }
590
591 pub fn top_k_by_with_ids(
599 &self,
600 field: impl AsRef<str>,
601 take_count: u32,
602 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
603 where
604 E: EntityValue,
605 {
606 self.ensure_non_paged_mode_ready()?;
607 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
608
609 self.session
610 .execute_load_query_with(self.query(), move |load, plan| {
611 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
612 })
613 }
614
615 pub fn bottom_k_by_with_ids(
623 &self,
624 field: impl AsRef<str>,
625 take_count: u32,
626 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
627 where
628 E: EntityValue,
629 {
630 self.ensure_non_paged_mode_ready()?;
631 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
632
633 self.session
634 .execute_load_query_with(self.query(), move |load, plan| {
635 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
636 })
637 }
638
639 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
642 where
643 E: EntityValue,
644 {
645 self.ensure_non_paged_mode_ready()?;
646 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
647
648 self.session
649 .execute_load_query_with(self.query(), move |load, plan| {
650 load.distinct_values_by_slot(plan, target_slot)
651 })
652 }
653
654 pub fn values_by_with_ids(
657 &self,
658 field: impl AsRef<str>,
659 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
660 where
661 E: EntityValue,
662 {
663 self.ensure_non_paged_mode_ready()?;
664 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
665
666 self.session
667 .execute_load_query_with(self.query(), move |load, plan| {
668 load.values_by_with_ids_slot(plan, target_slot)
669 })
670 }
671
672 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
675 where
676 E: EntityValue,
677 {
678 self.ensure_non_paged_mode_ready()?;
679 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
680
681 self.session
682 .execute_load_query_with(self.query(), move |load, plan| {
683 load.first_value_by_slot(plan, target_slot)
684 })
685 }
686
687 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
690 where
691 E: EntityValue,
692 {
693 self.ensure_non_paged_mode_ready()?;
694 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
695
696 self.session
697 .execute_load_query_with(self.query(), move |load, plan| {
698 load.last_value_by_slot(plan, target_slot)
699 })
700 }
701
702 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
704 where
705 E: EntityValue,
706 {
707 self.ensure_non_paged_mode_ready()?;
708
709 self.session
710 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
711 }
712
713 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
715 where
716 E: EntityValue,
717 {
718 self.ensure_non_paged_mode_ready()?;
719
720 self.session
721 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
722 }
723
724 pub fn require_one(&self) -> Result<(), QueryError>
726 where
727 E: EntityValue,
728 {
729 self.execute()?.require_one()?;
730 Ok(())
731 }
732
733 pub fn require_some(&self) -> Result<(), QueryError>
735 where
736 E: EntityValue,
737 {
738 self.execute()?.require_some()?;
739 Ok(())
740 }
741}
742
743impl<E> FluentLoadQuery<'_, E>
744where
745 E: EntityKind,
746{
747 fn resolve_terminal_field_slot(field: &str) -> Result<FieldSlot, QueryError> {
751 FieldSlot::resolve(E::MODEL, field).ok_or_else(|| {
752 QueryError::execute(InternalError::executor_unsupported(format!(
753 "unknown aggregate target field: {field}",
754 )))
755 })
756 }
757
758 fn non_paged_intent_error(&self) -> Option<IntentError> {
759 validate_fluent_non_paged_mode(self.cursor_token.is_some(), self.query.has_grouping())
760 .err()
761 .map(IntentError::from)
762 }
763
764 fn cursor_intent_error(&self) -> Option<IntentError> {
765 self.cursor_token
766 .as_ref()
767 .and_then(|_| self.paged_intent_error())
768 }
769
770 fn paged_intent_error(&self) -> Option<IntentError> {
771 validate_fluent_paged_mode(
772 self.query.has_grouping(),
773 self.query.has_explicit_order(),
774 self.query.load_spec(),
775 )
776 .err()
777 .map(IntentError::from)
778 }
779
780 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
781 if let Some(err) = self.paged_intent_error() {
782 return Err(QueryError::Intent(err));
783 }
784
785 Ok(())
786 }
787
788 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
789 if let Some(err) = self.non_paged_intent_error() {
790 return Err(QueryError::Intent(err));
791 }
792
793 Ok(())
794 }
795}
796
797impl<E> FluentLoadQuery<'_, E>
798where
799 E: EntityKind + SingletonEntity,
800 E::Key: Default,
801{
802 #[must_use]
803 pub fn only(self) -> Self {
804 self.map_query(Query::only)
805 }
806}
807
808pub struct PagedLoadQuery<'a, E>
816where
817 E: EntityKind,
818{
819 inner: FluentLoadQuery<'a, E>,
820}
821
822impl<E> PagedLoadQuery<'_, E>
823where
824 E: EntityKind,
825{
826 #[must_use]
831 pub const fn query(&self) -> &Query<E> {
832 self.inner.query()
833 }
834
835 #[must_use]
841 pub fn cursor(mut self, token: impl Into<String>) -> Self {
842 self.inner = self.inner.cursor(token);
843 self
844 }
845
846 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
856 where
857 E: EntityValue,
858 {
859 self.execute_with_trace()
860 .map(PagedLoadExecutionWithTrace::into_execution)
861 }
862
863 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
869 where
870 E: EntityValue,
871 {
872 self.inner.ensure_paged_mode_ready()?;
873
874 self.inner.session.execute_load_query_paged_with_trace(
875 self.inner.query(),
876 self.inner.cursor_token.as_deref(),
877 )
878 }
879}