1use crate::{
7 db::{
8 DbSession, PagedGroupedExecutionWithTrace, PagedLoadExecution, PagedLoadExecutionWithTrace,
9 predicate::{CompareOp, Predicate},
10 query::{
11 builder::aggregate::AggregateExpr,
12 explain::ExplainPlan,
13 expr::{FilterExpr, SortExpr},
14 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
15 plan::{FieldSlot, validate_fluent_non_paged_mode, validate_fluent_paged_mode},
16 },
17 response::Response,
18 },
19 error::InternalError,
20 traits::{EntityKind, EntityValue, SingletonEntity},
21 types::{Decimal, Id},
22 value::Value,
23};
24
25type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
26
27pub struct FluentLoadQuery<'a, E>
36where
37 E: EntityKind,
38{
39 session: &'a DbSession<E::Canister>,
40 query: Query<E>,
41 cursor_token: Option<String>,
42}
43
44impl<'a, E> FluentLoadQuery<'a, E>
45where
46 E: EntityKind,
47{
48 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
49 Self {
50 session,
51 query,
52 cursor_token: None,
53 }
54 }
55
56 #[must_use]
61 pub const fn query(&self) -> &Query<E> {
62 &self.query
63 }
64
65 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
66 self.query = map(self.query);
67 self
68 }
69
70 fn try_map_query(
71 mut self,
72 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
73 ) -> Result<Self, QueryError> {
74 self.query = map(self.query)?;
75 Ok(self)
76 }
77
78 #[must_use]
86 pub fn by_id(mut self, id: Id<E>) -> Self {
87 self.query = self.query.by_id(id.key());
88 self
89 }
90
91 #[must_use]
95 pub fn by_ids<I>(mut self, ids: I) -> Self
96 where
97 I: IntoIterator<Item = Id<E>>,
98 {
99 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
100 self
101 }
102
103 #[must_use]
108 pub fn filter(self, predicate: Predicate) -> Self {
109 self.map_query(|query| query.filter(predicate))
110 }
111
112 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
113 self.try_map_query(|query| query.filter_expr(expr))
114 }
115
116 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
117 self.try_map_query(|query| query.sort_expr(expr))
118 }
119
120 #[must_use]
121 pub fn order_by(self, field: impl AsRef<str>) -> Self {
122 self.map_query(|query| query.order_by(field))
123 }
124
125 #[must_use]
126 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
127 self.map_query(|query| query.order_by_desc(field))
128 }
129
130 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
132 let field = field.as_ref().to_owned();
133 self.try_map_query(|query| query.group_by(&field))
134 }
135
136 #[must_use]
138 pub fn aggregate(self, aggregate: AggregateExpr) -> Self {
139 self.map_query(|query| query.aggregate(aggregate))
140 }
141
142 #[must_use]
144 pub fn grouped_limits(self, max_groups: u64, max_group_bytes: u64) -> Self {
145 self.map_query(|query| query.grouped_limits(max_groups, max_group_bytes))
146 }
147
148 pub fn having_group(
150 self,
151 field: impl AsRef<str>,
152 op: CompareOp,
153 value: Value,
154 ) -> Result<Self, QueryError> {
155 let field = field.as_ref().to_owned();
156 self.try_map_query(|query| query.having_group(&field, op, value))
157 }
158
159 pub fn having_aggregate(
161 self,
162 aggregate_index: usize,
163 op: CompareOp,
164 value: Value,
165 ) -> Result<Self, QueryError> {
166 self.try_map_query(|query| query.having_aggregate(aggregate_index, op, value))
167 }
168
169 #[must_use]
175 pub fn limit(self, limit: u32) -> Self {
176 self.map_query(|query| query.limit(limit))
177 }
178
179 #[must_use]
185 pub fn offset(self, offset: u32) -> Self {
186 self.map_query(|query| query.offset(offset))
187 }
188
189 #[must_use]
195 pub fn cursor(mut self, token: impl Into<String>) -> Self {
196 self.cursor_token = Some(token.into());
197 self
198 }
199
200 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
205 self.query.explain()
206 }
207
208 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
209 if let Some(err) = self.cursor_intent_error() {
210 return Err(QueryError::Intent(err));
211 }
212
213 self.query.planned()
214 }
215
216 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
217 if let Some(err) = self.cursor_intent_error() {
218 return Err(QueryError::Intent(err));
219 }
220
221 self.query.plan()
222 }
223
224 pub fn execute(&self) -> Result<Response<E>, QueryError>
230 where
231 E: EntityValue,
232 {
233 self.ensure_non_paged_mode_ready()?;
234
235 self.session.execute_query(self.query())
236 }
237
238 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
249 self.ensure_paged_mode_ready()?;
250
251 Ok(PagedLoadQuery { inner: self })
252 }
253
254 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
258 where
259 E: EntityValue,
260 {
261 self.page()?.execute()
262 }
263
264 pub fn execute_grouped(self) -> Result<PagedGroupedExecutionWithTrace, QueryError>
269 where
270 E: EntityValue,
271 {
272 self.session
273 .execute_grouped(self.query(), self.cursor_token.as_deref())
274 }
275
276 pub fn is_empty(&self) -> Result<bool, QueryError>
282 where
283 E: EntityValue,
284 {
285 Ok(!self.exists()?)
286 }
287
288 pub fn exists(&self) -> Result<bool, QueryError>
290 where
291 E: EntityValue,
292 {
293 self.ensure_non_paged_mode_ready()?;
294
295 self.session
296 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
297 }
298
299 pub fn count(&self) -> Result<u32, QueryError>
301 where
302 E: EntityValue,
303 {
304 self.ensure_non_paged_mode_ready()?;
305
306 self.session
307 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
308 }
309
310 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
312 where
313 E: EntityValue,
314 {
315 self.ensure_non_paged_mode_ready()?;
316
317 self.session
318 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
319 }
320
321 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
325 where
326 E: EntityValue,
327 {
328 self.ensure_non_paged_mode_ready()?;
329 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
330
331 self.session
332 .execute_load_query_with(self.query(), move |load, plan| {
333 load.aggregate_min_by_slot(plan, target_slot)
334 })
335 }
336
337 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
339 where
340 E: EntityValue,
341 {
342 self.ensure_non_paged_mode_ready()?;
343
344 self.session
345 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
346 }
347
348 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
352 where
353 E: EntityValue,
354 {
355 self.ensure_non_paged_mode_ready()?;
356 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
357
358 self.session
359 .execute_load_query_with(self.query(), move |load, plan| {
360 load.aggregate_max_by_slot(plan, target_slot)
361 })
362 }
363
364 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
367 where
368 E: EntityValue,
369 {
370 self.ensure_non_paged_mode_ready()?;
371 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
372
373 self.session
374 .execute_load_query_with(self.query(), move |load, plan| {
375 load.aggregate_nth_by_slot(plan, target_slot, nth)
376 })
377 }
378
379 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
381 where
382 E: EntityValue,
383 {
384 self.ensure_non_paged_mode_ready()?;
385 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
386
387 self.session
388 .execute_load_query_with(self.query(), move |load, plan| {
389 load.aggregate_sum_by_slot(plan, target_slot)
390 })
391 }
392
393 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
395 where
396 E: EntityValue,
397 {
398 self.ensure_non_paged_mode_ready()?;
399 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
400
401 self.session
402 .execute_load_query_with(self.query(), move |load, plan| {
403 load.aggregate_sum_distinct_by_slot(plan, target_slot)
404 })
405 }
406
407 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
409 where
410 E: EntityValue,
411 {
412 self.ensure_non_paged_mode_ready()?;
413 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
414
415 self.session
416 .execute_load_query_with(self.query(), move |load, plan| {
417 load.aggregate_avg_by_slot(plan, target_slot)
418 })
419 }
420
421 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
426 where
427 E: EntityValue,
428 {
429 self.ensure_non_paged_mode_ready()?;
430 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
431
432 self.session
433 .execute_load_query_with(self.query(), move |load, plan| {
434 load.aggregate_median_by_slot(plan, target_slot)
435 })
436 }
437
438 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
441 where
442 E: EntityValue,
443 {
444 self.ensure_non_paged_mode_ready()?;
445 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
446
447 self.session
448 .execute_load_query_with(self.query(), move |load, plan| {
449 load.aggregate_count_distinct_by_slot(plan, target_slot)
450 })
451 }
452
453 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
457 where
458 E: EntityValue,
459 {
460 self.ensure_non_paged_mode_ready()?;
461 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
462
463 self.session
464 .execute_load_query_with(self.query(), move |load, plan| {
465 load.aggregate_min_max_by_slot(plan, target_slot)
466 })
467 }
468
469 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
471 where
472 E: EntityValue,
473 {
474 self.ensure_non_paged_mode_ready()?;
475 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
476
477 self.session
478 .execute_load_query_with(self.query(), move |load, plan| {
479 load.values_by_slot(plan, target_slot)
480 })
481 }
482
483 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
485 where
486 E: EntityValue,
487 {
488 self.ensure_non_paged_mode_ready()?;
489
490 self.session
491 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
492 }
493
494 pub fn top_k_by(
502 &self,
503 field: impl AsRef<str>,
504 take_count: u32,
505 ) -> Result<Response<E>, QueryError>
506 where
507 E: EntityValue,
508 {
509 self.ensure_non_paged_mode_ready()?;
510 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
511
512 self.session
513 .execute_load_query_with(self.query(), move |load, plan| {
514 load.top_k_by_slot(plan, target_slot, take_count)
515 })
516 }
517
518 pub fn bottom_k_by(
526 &self,
527 field: impl AsRef<str>,
528 take_count: u32,
529 ) -> Result<Response<E>, QueryError>
530 where
531 E: EntityValue,
532 {
533 self.ensure_non_paged_mode_ready()?;
534 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
535
536 self.session
537 .execute_load_query_with(self.query(), move |load, plan| {
538 load.bottom_k_by_slot(plan, target_slot, take_count)
539 })
540 }
541
542 pub fn top_k_by_values(
550 &self,
551 field: impl AsRef<str>,
552 take_count: u32,
553 ) -> Result<Vec<Value>, QueryError>
554 where
555 E: EntityValue,
556 {
557 self.ensure_non_paged_mode_ready()?;
558 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
559
560 self.session
561 .execute_load_query_with(self.query(), move |load, plan| {
562 load.top_k_by_values_slot(plan, target_slot, take_count)
563 })
564 }
565
566 pub fn bottom_k_by_values(
574 &self,
575 field: impl AsRef<str>,
576 take_count: u32,
577 ) -> Result<Vec<Value>, QueryError>
578 where
579 E: EntityValue,
580 {
581 self.ensure_non_paged_mode_ready()?;
582 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
583
584 self.session
585 .execute_load_query_with(self.query(), move |load, plan| {
586 load.bottom_k_by_values_slot(plan, target_slot, take_count)
587 })
588 }
589
590 pub fn top_k_by_with_ids(
598 &self,
599 field: impl AsRef<str>,
600 take_count: u32,
601 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
602 where
603 E: EntityValue,
604 {
605 self.ensure_non_paged_mode_ready()?;
606 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
607
608 self.session
609 .execute_load_query_with(self.query(), move |load, plan| {
610 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
611 })
612 }
613
614 pub fn bottom_k_by_with_ids(
622 &self,
623 field: impl AsRef<str>,
624 take_count: u32,
625 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
626 where
627 E: EntityValue,
628 {
629 self.ensure_non_paged_mode_ready()?;
630 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
631
632 self.session
633 .execute_load_query_with(self.query(), move |load, plan| {
634 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
635 })
636 }
637
638 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
641 where
642 E: EntityValue,
643 {
644 self.ensure_non_paged_mode_ready()?;
645 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
646
647 self.session
648 .execute_load_query_with(self.query(), move |load, plan| {
649 load.distinct_values_by_slot(plan, target_slot)
650 })
651 }
652
653 pub fn values_by_with_ids(
656 &self,
657 field: impl AsRef<str>,
658 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
659 where
660 E: EntityValue,
661 {
662 self.ensure_non_paged_mode_ready()?;
663 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
664
665 self.session
666 .execute_load_query_with(self.query(), move |load, plan| {
667 load.values_by_with_ids_slot(plan, target_slot)
668 })
669 }
670
671 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
674 where
675 E: EntityValue,
676 {
677 self.ensure_non_paged_mode_ready()?;
678 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
679
680 self.session
681 .execute_load_query_with(self.query(), move |load, plan| {
682 load.first_value_by_slot(plan, target_slot)
683 })
684 }
685
686 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
689 where
690 E: EntityValue,
691 {
692 self.ensure_non_paged_mode_ready()?;
693 let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
694
695 self.session
696 .execute_load_query_with(self.query(), move |load, plan| {
697 load.last_value_by_slot(plan, target_slot)
698 })
699 }
700
701 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
703 where
704 E: EntityValue,
705 {
706 self.ensure_non_paged_mode_ready()?;
707
708 self.session
709 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
710 }
711
712 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
714 where
715 E: EntityValue,
716 {
717 self.ensure_non_paged_mode_ready()?;
718
719 self.session
720 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
721 }
722
723 pub fn require_one(&self) -> Result<(), QueryError>
725 where
726 E: EntityValue,
727 {
728 self.execute()?.require_one()?;
729 Ok(())
730 }
731
732 pub fn require_some(&self) -> Result<(), QueryError>
734 where
735 E: EntityValue,
736 {
737 self.execute()?.require_some()?;
738 Ok(())
739 }
740}
741
742impl<E> FluentLoadQuery<'_, E>
743where
744 E: EntityKind,
745{
746 fn resolve_terminal_field_slot(field: &str) -> Result<FieldSlot, QueryError> {
750 FieldSlot::resolve(E::MODEL, field).ok_or_else(|| {
751 QueryError::execute(InternalError::executor_unsupported(format!(
752 "unknown aggregate target field: {field}",
753 )))
754 })
755 }
756
757 fn non_paged_intent_error(&self) -> Option<IntentError> {
758 validate_fluent_non_paged_mode(self.cursor_token.is_some(), self.query.has_grouping())
759 .err()
760 .map(IntentError::from)
761 }
762
763 fn cursor_intent_error(&self) -> Option<IntentError> {
764 self.cursor_token
765 .as_ref()
766 .and_then(|_| self.paged_intent_error())
767 }
768
769 fn paged_intent_error(&self) -> Option<IntentError> {
770 validate_fluent_paged_mode(
771 self.query.has_grouping(),
772 self.query.has_explicit_order(),
773 self.query.load_spec(),
774 )
775 .err()
776 .map(IntentError::from)
777 }
778
779 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
780 if let Some(err) = self.paged_intent_error() {
781 return Err(QueryError::Intent(err));
782 }
783
784 Ok(())
785 }
786
787 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
788 if let Some(err) = self.non_paged_intent_error() {
789 return Err(QueryError::Intent(err));
790 }
791
792 Ok(())
793 }
794}
795
796impl<E> FluentLoadQuery<'_, E>
797where
798 E: EntityKind + SingletonEntity,
799 E::Key: Default,
800{
801 #[must_use]
802 pub fn only(self) -> Self {
803 self.map_query(Query::only)
804 }
805}
806
807pub struct PagedLoadQuery<'a, E>
815where
816 E: EntityKind,
817{
818 inner: FluentLoadQuery<'a, E>,
819}
820
821impl<E> PagedLoadQuery<'_, E>
822where
823 E: EntityKind,
824{
825 #[must_use]
830 pub const fn query(&self) -> &Query<E> {
831 self.inner.query()
832 }
833
834 #[must_use]
840 pub fn cursor(mut self, token: impl Into<String>) -> Self {
841 self.inner = self.inner.cursor(token);
842 self
843 }
844
845 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
855 where
856 E: EntityValue,
857 {
858 self.execute_with_trace()
859 .map(PagedLoadExecutionWithTrace::into_execution)
860 }
861
862 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
868 where
869 E: EntityValue,
870 {
871 self.inner.ensure_paged_mode_ready()?;
872
873 self.inner.session.execute_load_query_paged_with_trace(
874 self.inner.query(),
875 self.inner.cursor_token.as_deref(),
876 )
877 }
878}