1use crate::{
7 db::{
8 DbSession, PersistedRow,
9 executor::{
10 ExecutablePlan, LoadExecutor, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryRequest,
12 },
13 query::{
14 api::ResponseCardinalityExt,
15 builder::{
16 AggregateExpr,
17 aggregate::{exists, first, last, max, min},
18 },
19 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
20 fluent::load::FluentLoadQuery,
21 intent::QueryError,
22 plan::AggregateKind,
23 },
24 response::EntityResponse,
25 },
26 error::InternalError,
27 traits::EntityValue,
28 types::{Decimal, Id},
29 value::Value,
30};
31
32type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
33
34impl<E> FluentLoadQuery<'_, E>
35where
36 E: PersistedRow,
37{
38 pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
44 where
45 E: EntityValue,
46 {
47 self.ensure_non_paged_mode_ready()?;
48
49 self.session.execute_query(self.query())
50 }
51
52 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
55 where
56 E: EntityValue,
57 F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
58 {
59 self.ensure_non_paged_mode_ready()?;
60
61 self.session.execute_load_query_with(self.query(), execute)
62 }
63
64 fn explain_scalar_non_paged_terminal(
67 &self,
68 aggregate: AggregateExpr,
69 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
70 where
71 E: EntityValue,
72 {
73 self.ensure_non_paged_mode_ready()?;
74
75 DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), aggregate)
76 }
77
78 pub fn is_empty(&self) -> Result<bool, QueryError>
84 where
85 E: EntityValue,
86 {
87 self.not_exists()
88 }
89
90 pub fn not_exists(&self) -> Result<bool, QueryError>
92 where
93 E: EntityValue,
94 {
95 Ok(!self.exists()?)
96 }
97
98 pub fn exists(&self) -> Result<bool, QueryError>
100 where
101 E: EntityValue,
102 {
103 self.execute_scalar_non_paged_terminal(|load, plan| {
104 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Exists)?
105 .into_exists()
106 })
107 }
108
109 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
111 where
112 E: EntityValue,
113 {
114 self.explain_scalar_non_paged_terminal(exists())
115 }
116
117 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
121 where
122 E: EntityValue,
123 {
124 self.explain_exists()
125 }
126
127 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
129 where
130 E: EntityValue,
131 {
132 self.query().explain_execution()
133 }
134
135 pub fn explain_execution_text(&self) -> Result<String, QueryError>
137 where
138 E: EntityValue,
139 {
140 self.query().explain_execution_text()
141 }
142
143 pub fn explain_execution_json(&self) -> Result<String, QueryError>
145 where
146 E: EntityValue,
147 {
148 self.query().explain_execution_json()
149 }
150
151 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
153 where
154 E: EntityValue,
155 {
156 self.query().explain_execution_verbose()
157 }
158
159 pub fn count(&self) -> Result<u32, QueryError>
161 where
162 E: EntityValue,
163 {
164 self.execute_scalar_non_paged_terminal(|load, plan| {
165 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Count)?
166 .into_count()
167 })
168 }
169
170 pub fn bytes(&self) -> Result<u64, QueryError>
173 where
174 E: EntityValue,
175 {
176 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
177 }
178
179 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
182 where
183 E: EntityValue,
184 {
185 self.ensure_non_paged_mode_ready()?;
186
187 Self::with_slot(field, |target_slot| {
188 self.session
189 .execute_load_query_with(self.query(), move |load, plan| {
190 load.bytes_by_slot(plan, target_slot)
191 })
192 })
193 }
194
195 pub fn explain_bytes_by(
197 &self,
198 field: impl AsRef<str>,
199 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
200 where
201 E: EntityValue,
202 {
203 self.ensure_non_paged_mode_ready()?;
204
205 Self::with_slot(field, |target_slot| {
206 DbSession::<E::Canister>::explain_load_query_bytes_by_with(
207 self.query(),
208 target_slot.field(),
209 )
210 })
211 }
212
213 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
215 where
216 E: EntityValue,
217 {
218 self.execute_scalar_non_paged_terminal(|load, plan| {
219 load.execute_scalar_terminal_request(
220 plan,
221 ScalarTerminalBoundaryRequest::IdTerminal {
222 kind: AggregateKind::Min,
223 },
224 )?
225 .into_id()
226 })
227 }
228
229 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
231 where
232 E: EntityValue,
233 {
234 self.explain_scalar_non_paged_terminal(min())
235 }
236
237 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
241 where
242 E: EntityValue,
243 {
244 self.ensure_non_paged_mode_ready()?;
245
246 Self::with_slot(field, |target_slot| {
247 self.session
248 .execute_load_query_with(self.query(), move |load, plan| {
249 load.execute_scalar_terminal_request(
250 plan,
251 ScalarTerminalBoundaryRequest::IdBySlot {
252 kind: AggregateKind::Min,
253 target_field: target_slot,
254 },
255 )?
256 .into_id()
257 })
258 })
259 }
260
261 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
263 where
264 E: EntityValue,
265 {
266 self.execute_scalar_non_paged_terminal(|load, plan| {
267 load.execute_scalar_terminal_request(
268 plan,
269 ScalarTerminalBoundaryRequest::IdTerminal {
270 kind: AggregateKind::Max,
271 },
272 )?
273 .into_id()
274 })
275 }
276
277 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
279 where
280 E: EntityValue,
281 {
282 self.explain_scalar_non_paged_terminal(max())
283 }
284
285 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
289 where
290 E: EntityValue,
291 {
292 self.ensure_non_paged_mode_ready()?;
293
294 Self::with_slot(field, |target_slot| {
295 self.session
296 .execute_load_query_with(self.query(), move |load, plan| {
297 load.execute_scalar_terminal_request(
298 plan,
299 ScalarTerminalBoundaryRequest::IdBySlot {
300 kind: AggregateKind::Max,
301 target_field: target_slot,
302 },
303 )?
304 .into_id()
305 })
306 })
307 }
308
309 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
312 where
313 E: EntityValue,
314 {
315 self.ensure_non_paged_mode_ready()?;
316
317 Self::with_slot(field, |target_slot| {
318 self.session
319 .execute_load_query_with(self.query(), move |load, plan| {
320 load.execute_scalar_terminal_request(
321 plan,
322 ScalarTerminalBoundaryRequest::NthBySlot {
323 target_field: target_slot,
324 nth,
325 },
326 )?
327 .into_id()
328 })
329 })
330 }
331
332 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
334 where
335 E: EntityValue,
336 {
337 self.ensure_non_paged_mode_ready()?;
338
339 Self::with_slot(field, |target_slot| {
340 self.session
341 .execute_load_query_with(self.query(), move |load, plan| {
342 load.execute_numeric_field_boundary(
343 plan,
344 target_slot,
345 ScalarNumericFieldBoundaryRequest::Sum,
346 )
347 })
348 })
349 }
350
351 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
353 where
354 E: EntityValue,
355 {
356 self.ensure_non_paged_mode_ready()?;
357
358 Self::with_slot(field, |target_slot| {
359 self.session
360 .execute_load_query_with(self.query(), move |load, plan| {
361 load.execute_numeric_field_boundary(
362 plan,
363 target_slot,
364 ScalarNumericFieldBoundaryRequest::SumDistinct,
365 )
366 })
367 })
368 }
369
370 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
372 where
373 E: EntityValue,
374 {
375 self.ensure_non_paged_mode_ready()?;
376
377 Self::with_slot(field, |target_slot| {
378 self.session
379 .execute_load_query_with(self.query(), move |load, plan| {
380 load.execute_numeric_field_boundary(
381 plan,
382 target_slot,
383 ScalarNumericFieldBoundaryRequest::Avg,
384 )
385 })
386 })
387 }
388
389 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
391 where
392 E: EntityValue,
393 {
394 self.ensure_non_paged_mode_ready()?;
395
396 Self::with_slot(field, |target_slot| {
397 self.session
398 .execute_load_query_with(self.query(), move |load, plan| {
399 load.execute_numeric_field_boundary(
400 plan,
401 target_slot,
402 ScalarNumericFieldBoundaryRequest::AvgDistinct,
403 )
404 })
405 })
406 }
407
408 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
413 where
414 E: EntityValue,
415 {
416 self.ensure_non_paged_mode_ready()?;
417
418 Self::with_slot(field, |target_slot| {
419 self.session
420 .execute_load_query_with(self.query(), move |load, plan| {
421 load.execute_scalar_terminal_request(
422 plan,
423 ScalarTerminalBoundaryRequest::MedianBySlot {
424 target_field: target_slot,
425 },
426 )?
427 .into_id()
428 })
429 })
430 }
431
432 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
435 where
436 E: EntityValue,
437 {
438 self.ensure_non_paged_mode_ready()?;
439
440 Self::with_slot(field, |target_slot| {
441 self.session
442 .execute_load_query_with(self.query(), move |load, plan| {
443 load.execute_scalar_projection_boundary(
444 plan,
445 target_slot,
446 ScalarProjectionBoundaryRequest::CountDistinct,
447 )?
448 .into_count()
449 })
450 })
451 }
452
453 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
457 where
458 E: EntityValue,
459 {
460 self.ensure_non_paged_mode_ready()?;
461
462 Self::with_slot(field, |target_slot| {
463 self.session
464 .execute_load_query_with(self.query(), move |load, plan| {
465 load.execute_scalar_terminal_request(
466 plan,
467 ScalarTerminalBoundaryRequest::MinMaxBySlot {
468 target_field: target_slot,
469 },
470 )?
471 .into_id_pair()
472 })
473 })
474 }
475
476 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
478 where
479 E: EntityValue,
480 {
481 self.ensure_non_paged_mode_ready()?;
482
483 Self::with_slot(field, |target_slot| {
484 self.session
485 .execute_load_query_with(self.query(), move |load, plan| {
486 load.execute_scalar_projection_boundary(
487 plan,
488 target_slot,
489 ScalarProjectionBoundaryRequest::Values,
490 )?
491 .into_values()
492 })
493 })
494 }
495
496 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
498 where
499 E: EntityValue,
500 {
501 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
502 }
503
504 pub fn top_k_by(
512 &self,
513 field: impl AsRef<str>,
514 take_count: u32,
515 ) -> Result<EntityResponse<E>, QueryError>
516 where
517 E: EntityValue,
518 {
519 self.ensure_non_paged_mode_ready()?;
520
521 Self::with_slot(field, |target_slot| {
522 self.session
523 .execute_load_query_with(self.query(), move |load, plan| {
524 load.top_k_by_slot(plan, target_slot, take_count)
525 })
526 })
527 }
528
529 pub fn bottom_k_by(
537 &self,
538 field: impl AsRef<str>,
539 take_count: u32,
540 ) -> Result<EntityResponse<E>, QueryError>
541 where
542 E: EntityValue,
543 {
544 self.ensure_non_paged_mode_ready()?;
545
546 Self::with_slot(field, |target_slot| {
547 self.session
548 .execute_load_query_with(self.query(), move |load, plan| {
549 load.bottom_k_by_slot(plan, target_slot, take_count)
550 })
551 })
552 }
553
554 pub fn top_k_by_values(
562 &self,
563 field: impl AsRef<str>,
564 take_count: u32,
565 ) -> Result<Vec<Value>, QueryError>
566 where
567 E: EntityValue,
568 {
569 self.ensure_non_paged_mode_ready()?;
570
571 Self::with_slot(field, |target_slot| {
572 self.session
573 .execute_load_query_with(self.query(), move |load, plan| {
574 load.top_k_by_values_slot(plan, target_slot, take_count)
575 })
576 })
577 }
578
579 pub fn bottom_k_by_values(
587 &self,
588 field: impl AsRef<str>,
589 take_count: u32,
590 ) -> Result<Vec<Value>, QueryError>
591 where
592 E: EntityValue,
593 {
594 self.ensure_non_paged_mode_ready()?;
595
596 Self::with_slot(field, |target_slot| {
597 self.session
598 .execute_load_query_with(self.query(), move |load, plan| {
599 load.bottom_k_by_values_slot(plan, target_slot, take_count)
600 })
601 })
602 }
603
604 pub fn top_k_by_with_ids(
612 &self,
613 field: impl AsRef<str>,
614 take_count: u32,
615 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
616 where
617 E: EntityValue,
618 {
619 self.ensure_non_paged_mode_ready()?;
620
621 Self::with_slot(field, |target_slot| {
622 self.session
623 .execute_load_query_with(self.query(), move |load, plan| {
624 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
625 })
626 })
627 }
628
629 pub fn bottom_k_by_with_ids(
637 &self,
638 field: impl AsRef<str>,
639 take_count: u32,
640 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
641 where
642 E: EntityValue,
643 {
644 self.ensure_non_paged_mode_ready()?;
645
646 Self::with_slot(field, |target_slot| {
647 self.session
648 .execute_load_query_with(self.query(), move |load, plan| {
649 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
650 })
651 })
652 }
653
654 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
657 where
658 E: EntityValue,
659 {
660 self.ensure_non_paged_mode_ready()?;
661
662 Self::with_slot(field, |target_slot| {
663 self.session
664 .execute_load_query_with(self.query(), move |load, plan| {
665 load.execute_scalar_projection_boundary(
666 plan,
667 target_slot,
668 ScalarProjectionBoundaryRequest::DistinctValues,
669 )?
670 .into_values()
671 })
672 })
673 }
674
675 pub fn values_by_with_ids(
678 &self,
679 field: impl AsRef<str>,
680 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
681 where
682 E: EntityValue,
683 {
684 self.ensure_non_paged_mode_ready()?;
685
686 Self::with_slot(field, |target_slot| {
687 self.session
688 .execute_load_query_with(self.query(), move |load, plan| {
689 load.execute_scalar_projection_boundary(
690 plan,
691 target_slot,
692 ScalarProjectionBoundaryRequest::ValuesWithIds,
693 )?
694 .into_values_with_ids()
695 })
696 })
697 }
698
699 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
702 where
703 E: EntityValue,
704 {
705 self.ensure_non_paged_mode_ready()?;
706
707 Self::with_slot(field, |target_slot| {
708 self.session
709 .execute_load_query_with(self.query(), move |load, plan| {
710 load.execute_scalar_projection_boundary(
711 plan,
712 target_slot,
713 ScalarProjectionBoundaryRequest::TerminalValue {
714 terminal_kind: AggregateKind::First,
715 },
716 )?
717 .into_terminal_value()
718 })
719 })
720 }
721
722 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
725 where
726 E: EntityValue,
727 {
728 self.ensure_non_paged_mode_ready()?;
729
730 Self::with_slot(field, |target_slot| {
731 self.session
732 .execute_load_query_with(self.query(), move |load, plan| {
733 load.execute_scalar_projection_boundary(
734 plan,
735 target_slot,
736 ScalarProjectionBoundaryRequest::TerminalValue {
737 terminal_kind: AggregateKind::Last,
738 },
739 )?
740 .into_terminal_value()
741 })
742 })
743 }
744
745 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
747 where
748 E: EntityValue,
749 {
750 self.execute_scalar_non_paged_terminal(|load, plan| {
751 load.execute_scalar_terminal_request(
752 plan,
753 ScalarTerminalBoundaryRequest::IdTerminal {
754 kind: AggregateKind::First,
755 },
756 )?
757 .into_id()
758 })
759 }
760
761 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
763 where
764 E: EntityValue,
765 {
766 self.explain_scalar_non_paged_terminal(first())
767 }
768
769 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
771 where
772 E: EntityValue,
773 {
774 self.execute_scalar_non_paged_terminal(|load, plan| {
775 load.execute_scalar_terminal_request(
776 plan,
777 ScalarTerminalBoundaryRequest::IdTerminal {
778 kind: AggregateKind::Last,
779 },
780 )?
781 .into_id()
782 })
783 }
784
785 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
787 where
788 E: EntityValue,
789 {
790 self.explain_scalar_non_paged_terminal(last())
791 }
792
793 pub fn require_one(&self) -> Result<(), QueryError>
795 where
796 E: EntityValue,
797 {
798 self.execute()?.require_one()?;
799 Ok(())
800 }
801
802 pub fn require_some(&self) -> Result<(), QueryError>
804 where
805 E: EntityValue,
806 {
807 self.execute()?.require_some()?;
808 Ok(())
809 }
810}