1use crate::{
16 db::{
17 DbSession, PersistedRow, Query,
18 query::{
19 api::ResponseCardinalityExt,
20 builder::{
21 AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
22 CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
23 FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
24 MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
25 MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
26 SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
27 ValuesBySlotWithIdsTerminal,
28 },
29 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
30 fluent::load::{FluentLoadQuery, LoadQueryResult},
31 intent::QueryError,
32 },
33 response::EntityResponse,
34 },
35 traits::EntityValue,
36 types::{Decimal, Id},
37 value::{OutputValue, Value},
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
52 type Output;
53 type ExplainOutput;
54
55 fn execute(
56 self,
57 session: &DbSession<E::Canister>,
58 query: &Query<E>,
59 ) -> Result<Self::Output, QueryError>;
60
61 fn explain(
62 &self,
63 session: &DbSession<E::Canister>,
64 query: &Query<E>,
65 ) -> Result<Self::ExplainOutput, QueryError>;
66}
67
68impl<E> TerminalStrategyDriver<E> for CountRowsTerminal
69where
70 E: PersistedRow + EntityValue,
71{
72 type Output = u32;
73 type ExplainOutput = ExplainAggregateTerminalPlan;
74
75 fn execute(
76 self,
77 session: &DbSession<E::Canister>,
78 query: &Query<E>,
79 ) -> Result<Self::Output, QueryError> {
80 session.execute_fluent_count_rows_terminal(query, self)
81 }
82
83 fn explain(
84 &self,
85 session: &DbSession<E::Canister>,
86 query: &Query<E>,
87 ) -> Result<Self::ExplainOutput, QueryError> {
88 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
89 }
90}
91
92impl<E> TerminalStrategyDriver<E> for ExistsRowsTerminal
93where
94 E: PersistedRow + EntityValue,
95{
96 type Output = bool;
97 type ExplainOutput = ExplainAggregateTerminalPlan;
98
99 fn execute(
100 self,
101 session: &DbSession<E::Canister>,
102 query: &Query<E>,
103 ) -> Result<Self::Output, QueryError> {
104 session.execute_fluent_exists_rows_terminal(query, self)
105 }
106
107 fn explain(
108 &self,
109 session: &DbSession<E::Canister>,
110 query: &Query<E>,
111 ) -> Result<Self::ExplainOutput, QueryError> {
112 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
113 }
114}
115
116impl<E> TerminalStrategyDriver<E> for MinIdTerminal
117where
118 E: PersistedRow + EntityValue,
119{
120 type Output = Option<Id<E>>;
121 type ExplainOutput = ExplainAggregateTerminalPlan;
122
123 fn execute(
124 self,
125 session: &DbSession<E::Canister>,
126 query: &Query<E>,
127 ) -> Result<Self::Output, QueryError> {
128 session.execute_fluent_min_id_terminal(query, self)
129 }
130
131 fn explain(
132 &self,
133 session: &DbSession<E::Canister>,
134 query: &Query<E>,
135 ) -> Result<Self::ExplainOutput, QueryError> {
136 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
137 }
138}
139
140impl<E> TerminalStrategyDriver<E> for MaxIdTerminal
141where
142 E: PersistedRow + EntityValue,
143{
144 type Output = Option<Id<E>>;
145 type ExplainOutput = ExplainAggregateTerminalPlan;
146
147 fn execute(
148 self,
149 session: &DbSession<E::Canister>,
150 query: &Query<E>,
151 ) -> Result<Self::Output, QueryError> {
152 session.execute_fluent_max_id_terminal(query, self)
153 }
154
155 fn explain(
156 &self,
157 session: &DbSession<E::Canister>,
158 query: &Query<E>,
159 ) -> Result<Self::ExplainOutput, QueryError> {
160 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
161 }
162}
163
164impl<E> TerminalStrategyDriver<E> for MinIdBySlotTerminal
165where
166 E: PersistedRow + EntityValue,
167{
168 type Output = Option<Id<E>>;
169 type ExplainOutput = ExplainAggregateTerminalPlan;
170
171 fn execute(
172 self,
173 session: &DbSession<E::Canister>,
174 query: &Query<E>,
175 ) -> Result<Self::Output, QueryError> {
176 session.execute_fluent_min_id_by_slot(query, self)
177 }
178
179 fn explain(
180 &self,
181 session: &DbSession<E::Canister>,
182 query: &Query<E>,
183 ) -> Result<Self::ExplainOutput, QueryError> {
184 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
185 }
186}
187
188impl<E> TerminalStrategyDriver<E> for MaxIdBySlotTerminal
189where
190 E: PersistedRow + EntityValue,
191{
192 type Output = Option<Id<E>>;
193 type ExplainOutput = ExplainAggregateTerminalPlan;
194
195 fn execute(
196 self,
197 session: &DbSession<E::Canister>,
198 query: &Query<E>,
199 ) -> Result<Self::Output, QueryError> {
200 session.execute_fluent_max_id_by_slot(query, self)
201 }
202
203 fn explain(
204 &self,
205 session: &DbSession<E::Canister>,
206 query: &Query<E>,
207 ) -> Result<Self::ExplainOutput, QueryError> {
208 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
209 }
210}
211
212impl<E> TerminalStrategyDriver<E> for SumBySlotTerminal
213where
214 E: PersistedRow + EntityValue,
215{
216 type Output = Option<Decimal>;
217 type ExplainOutput = ExplainAggregateTerminalPlan;
218
219 fn execute(
220 self,
221 session: &DbSession<E::Canister>,
222 query: &Query<E>,
223 ) -> Result<Self::Output, QueryError> {
224 session.execute_fluent_sum_by_slot(query, self)
225 }
226
227 fn explain(
228 &self,
229 session: &DbSession<E::Canister>,
230 query: &Query<E>,
231 ) -> Result<Self::ExplainOutput, QueryError> {
232 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
233 }
234}
235
236impl<E> TerminalStrategyDriver<E> for SumDistinctBySlotTerminal
237where
238 E: PersistedRow + EntityValue,
239{
240 type Output = Option<Decimal>;
241 type ExplainOutput = ExplainAggregateTerminalPlan;
242
243 fn execute(
244 self,
245 session: &DbSession<E::Canister>,
246 query: &Query<E>,
247 ) -> Result<Self::Output, QueryError> {
248 session.execute_fluent_sum_distinct_by_slot(query, self)
249 }
250
251 fn explain(
252 &self,
253 session: &DbSession<E::Canister>,
254 query: &Query<E>,
255 ) -> Result<Self::ExplainOutput, QueryError> {
256 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
257 }
258}
259
260impl<E> TerminalStrategyDriver<E> for AvgBySlotTerminal
261where
262 E: PersistedRow + EntityValue,
263{
264 type Output = Option<Decimal>;
265 type ExplainOutput = ExplainAggregateTerminalPlan;
266
267 fn execute(
268 self,
269 session: &DbSession<E::Canister>,
270 query: &Query<E>,
271 ) -> Result<Self::Output, QueryError> {
272 session.execute_fluent_avg_by_slot(query, self)
273 }
274
275 fn explain(
276 &self,
277 session: &DbSession<E::Canister>,
278 query: &Query<E>,
279 ) -> Result<Self::ExplainOutput, QueryError> {
280 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
281 }
282}
283
284impl<E> TerminalStrategyDriver<E> for AvgDistinctBySlotTerminal
285where
286 E: PersistedRow + EntityValue,
287{
288 type Output = Option<Decimal>;
289 type ExplainOutput = ExplainAggregateTerminalPlan;
290
291 fn execute(
292 self,
293 session: &DbSession<E::Canister>,
294 query: &Query<E>,
295 ) -> Result<Self::Output, QueryError> {
296 session.execute_fluent_avg_distinct_by_slot(query, self)
297 }
298
299 fn explain(
300 &self,
301 session: &DbSession<E::Canister>,
302 query: &Query<E>,
303 ) -> Result<Self::ExplainOutput, QueryError> {
304 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
305 }
306}
307
308impl<E> TerminalStrategyDriver<E> for FirstIdTerminal
309where
310 E: PersistedRow + EntityValue,
311{
312 type Output = Option<Id<E>>;
313 type ExplainOutput = ExplainAggregateTerminalPlan;
314
315 fn execute(
316 self,
317 session: &DbSession<E::Canister>,
318 query: &Query<E>,
319 ) -> Result<Self::Output, QueryError> {
320 session.execute_fluent_first_id_terminal(query, self)
321 }
322
323 fn explain(
324 &self,
325 session: &DbSession<E::Canister>,
326 query: &Query<E>,
327 ) -> Result<Self::ExplainOutput, QueryError> {
328 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
329 }
330}
331
332impl<E> TerminalStrategyDriver<E> for LastIdTerminal
333where
334 E: PersistedRow + EntityValue,
335{
336 type Output = Option<Id<E>>;
337 type ExplainOutput = ExplainAggregateTerminalPlan;
338
339 fn execute(
340 self,
341 session: &DbSession<E::Canister>,
342 query: &Query<E>,
343 ) -> Result<Self::Output, QueryError> {
344 session.execute_fluent_last_id_terminal(query, self)
345 }
346
347 fn explain(
348 &self,
349 session: &DbSession<E::Canister>,
350 query: &Query<E>,
351 ) -> Result<Self::ExplainOutput, QueryError> {
352 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
353 }
354}
355
356impl<E> TerminalStrategyDriver<E> for NthIdBySlotTerminal
357where
358 E: PersistedRow + EntityValue,
359{
360 type Output = Option<Id<E>>;
361 type ExplainOutput = ExplainAggregateTerminalPlan;
362
363 fn execute(
364 self,
365 session: &DbSession<E::Canister>,
366 query: &Query<E>,
367 ) -> Result<Self::Output, QueryError> {
368 session.execute_fluent_nth_id_by_slot(query, self)
369 }
370
371 fn explain(
372 &self,
373 session: &DbSession<E::Canister>,
374 query: &Query<E>,
375 ) -> Result<Self::ExplainOutput, QueryError> {
376 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
377 }
378}
379
380impl<E> TerminalStrategyDriver<E> for MedianIdBySlotTerminal
381where
382 E: PersistedRow + EntityValue,
383{
384 type Output = Option<Id<E>>;
385 type ExplainOutput = ExplainAggregateTerminalPlan;
386
387 fn execute(
388 self,
389 session: &DbSession<E::Canister>,
390 query: &Query<E>,
391 ) -> Result<Self::Output, QueryError> {
392 session.execute_fluent_median_id_by_slot(query, self)
393 }
394
395 fn explain(
396 &self,
397 session: &DbSession<E::Canister>,
398 query: &Query<E>,
399 ) -> Result<Self::ExplainOutput, QueryError> {
400 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
401 }
402}
403
404impl<E> TerminalStrategyDriver<E> for MinMaxIdBySlotTerminal
405where
406 E: PersistedRow + EntityValue,
407{
408 type Output = MinMaxByIds<E>;
409 type ExplainOutput = ExplainAggregateTerminalPlan;
410
411 fn execute(
412 self,
413 session: &DbSession<E::Canister>,
414 query: &Query<E>,
415 ) -> Result<Self::Output, QueryError> {
416 session.execute_fluent_min_max_id_by_slot(query, self)
417 }
418
419 fn explain(
420 &self,
421 session: &DbSession<E::Canister>,
422 query: &Query<E>,
423 ) -> Result<Self::ExplainOutput, QueryError> {
424 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
425 }
426}
427
428impl<E> TerminalStrategyDriver<E> for ValuesBySlotTerminal
429where
430 E: PersistedRow + EntityValue,
431{
432 type Output = Vec<Value>;
433 type ExplainOutput = ExplainExecutionNodeDescriptor;
434
435 fn execute(
436 self,
437 session: &DbSession<E::Canister>,
438 query: &Query<E>,
439 ) -> Result<Self::Output, QueryError> {
440 session.execute_fluent_values_by_slot(query, self)
441 }
442
443 fn explain(
444 &self,
445 session: &DbSession<E::Canister>,
446 query: &Query<E>,
447 ) -> Result<Self::ExplainOutput, QueryError> {
448 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
449 }
450}
451
452impl<E> TerminalStrategyDriver<E> for DistinctValuesBySlotTerminal
453where
454 E: PersistedRow + EntityValue,
455{
456 type Output = Vec<Value>;
457 type ExplainOutput = ExplainExecutionNodeDescriptor;
458
459 fn execute(
460 self,
461 session: &DbSession<E::Canister>,
462 query: &Query<E>,
463 ) -> Result<Self::Output, QueryError> {
464 session.execute_fluent_distinct_values_by_slot(query, self)
465 }
466
467 fn explain(
468 &self,
469 session: &DbSession<E::Canister>,
470 query: &Query<E>,
471 ) -> Result<Self::ExplainOutput, QueryError> {
472 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
473 }
474}
475
476impl<E> TerminalStrategyDriver<E> for CountDistinctBySlotTerminal
477where
478 E: PersistedRow + EntityValue,
479{
480 type Output = u32;
481 type ExplainOutput = ExplainExecutionNodeDescriptor;
482
483 fn execute(
484 self,
485 session: &DbSession<E::Canister>,
486 query: &Query<E>,
487 ) -> Result<Self::Output, QueryError> {
488 session.execute_fluent_count_distinct_by_slot(query, self)
489 }
490
491 fn explain(
492 &self,
493 session: &DbSession<E::Canister>,
494 query: &Query<E>,
495 ) -> Result<Self::ExplainOutput, QueryError> {
496 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
497 }
498}
499
500impl<E> TerminalStrategyDriver<E> for ValuesBySlotWithIdsTerminal
501where
502 E: PersistedRow + EntityValue,
503{
504 type Output = Vec<(Id<E>, Value)>;
505 type ExplainOutput = ExplainExecutionNodeDescriptor;
506
507 fn execute(
508 self,
509 session: &DbSession<E::Canister>,
510 query: &Query<E>,
511 ) -> Result<Self::Output, QueryError> {
512 session.execute_fluent_values_by_with_ids_slot(query, self)
513 }
514
515 fn explain(
516 &self,
517 session: &DbSession<E::Canister>,
518 query: &Query<E>,
519 ) -> Result<Self::ExplainOutput, QueryError> {
520 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
521 }
522}
523
524impl<E> TerminalStrategyDriver<E> for FirstValueBySlotTerminal
525where
526 E: PersistedRow + EntityValue,
527{
528 type Output = Option<Value>;
529 type ExplainOutput = ExplainExecutionNodeDescriptor;
530
531 fn execute(
532 self,
533 session: &DbSession<E::Canister>,
534 query: &Query<E>,
535 ) -> Result<Self::Output, QueryError> {
536 session.execute_fluent_first_value_by_slot(query, self)
537 }
538
539 fn explain(
540 &self,
541 session: &DbSession<E::Canister>,
542 query: &Query<E>,
543 ) -> Result<Self::ExplainOutput, QueryError> {
544 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
545 }
546}
547
548impl<E> TerminalStrategyDriver<E> for LastValueBySlotTerminal
549where
550 E: PersistedRow + EntityValue,
551{
552 type Output = Option<Value>;
553 type ExplainOutput = ExplainExecutionNodeDescriptor;
554
555 fn execute(
556 self,
557 session: &DbSession<E::Canister>,
558 query: &Query<E>,
559 ) -> Result<Self::Output, QueryError> {
560 session.execute_fluent_last_value_by_slot(query, self)
561 }
562
563 fn explain(
564 &self,
565 session: &DbSession<E::Canister>,
566 query: &Query<E>,
567 ) -> Result<Self::ExplainOutput, QueryError> {
568 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
569 }
570}
571
572fn output(value: Value) -> OutputValue {
574 OutputValue::from(value)
575}
576
577fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
579 values.into_iter().map(output).collect()
580}
581
582fn output_values_with_ids<E: PersistedRow>(
584 values: Vec<(Id<E>, Value)>,
585) -> Vec<(Id<E>, OutputValue)> {
586 values
587 .into_iter()
588 .map(|(id, value)| (id, output(value)))
589 .collect()
590}
591
592impl<E> FluentLoadQuery<'_, E>
593where
594 E: PersistedRow,
595{
596 pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
602 where
603 E: EntityValue,
604 {
605 self.with_non_paged(DbSession::execute_query_result)
606 }
607
608 fn with_non_paged<T>(
611 &self,
612 map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
613 ) -> Result<T, QueryError>
614 where
615 E: EntityValue,
616 {
617 self.ensure_non_paged_mode_ready()?;
618 map(self.session, self.query())
619 }
620
621 fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
624 where
625 E: EntityValue,
626 {
627 self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
628 }
629
630 fn render_execution_descriptor(
633 &self,
634 render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
635 ) -> Result<String, QueryError>
636 where
637 E: EntityValue,
638 {
639 let descriptor = self.explain_execution_descriptor()?;
640
641 Ok(render(descriptor))
642 }
643
644 fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
647 where
648 E: EntityValue,
649 S: TerminalStrategyDriver<E>,
650 {
651 self.with_non_paged(|session, query| strategy.execute(session, query))
652 }
653
654 fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
657 where
658 E: EntityValue,
659 S: TerminalStrategyDriver<E>,
660 {
661 self.with_non_paged(|session, query| strategy.explain(session, query))
662 }
663
664 fn project_terminal_items<P, T, U>(
667 projection: &P,
668 values: impl IntoIterator<Item = T>,
669 mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
670 ) -> Result<Vec<U>, QueryError>
671 where
672 P: ValueProjectionExpr,
673 {
674 values
675 .into_iter()
676 .map(|value| map(projection, value))
677 .collect()
678 }
679
680 pub fn is_empty(&self) -> Result<bool, QueryError>
686 where
687 E: EntityValue,
688 {
689 self.not_exists()
690 }
691
692 pub fn not_exists(&self) -> Result<bool, QueryError>
694 where
695 E: EntityValue,
696 {
697 Ok(!self.exists()?)
698 }
699
700 pub fn exists(&self) -> Result<bool, QueryError>
702 where
703 E: EntityValue,
704 {
705 self.execute_terminal(ExistsRowsTerminal::new())
706 }
707
708 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
710 where
711 E: EntityValue,
712 {
713 self.explain_terminal(&ExistsRowsTerminal::new())
714 }
715
716 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
720 where
721 E: EntityValue,
722 {
723 self.explain_exists()
724 }
725
726 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
728 where
729 E: EntityValue,
730 {
731 self.explain_execution_descriptor()
732 }
733
734 pub fn explain_execution_text(&self) -> Result<String, QueryError>
736 where
737 E: EntityValue,
738 {
739 self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
740 }
741
742 pub fn explain_execution_json(&self) -> Result<String, QueryError>
744 where
745 E: EntityValue,
746 {
747 self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
748 }
749
750 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
752 where
753 E: EntityValue,
754 {
755 self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
756 }
757
758 pub fn count(&self) -> Result<u32, QueryError>
760 where
761 E: EntityValue,
762 {
763 self.execute_terminal(CountRowsTerminal::new())
764 }
765
766 pub fn bytes(&self) -> Result<u64, QueryError>
769 where
770 E: EntityValue,
771 {
772 self.with_non_paged(DbSession::execute_fluent_bytes)
773 }
774
775 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
778 where
779 E: EntityValue,
780 {
781 let target_slot = self.resolve_non_paged_slot(field)?;
782
783 self.with_non_paged(|session, query| {
784 session.execute_fluent_bytes_by_slot(query, target_slot)
785 })
786 }
787
788 pub fn explain_bytes_by(
790 &self,
791 field: impl AsRef<str>,
792 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
793 where
794 E: EntityValue,
795 {
796 let target_slot = self.resolve_non_paged_slot(field)?;
797
798 self.with_non_paged(|session, query| {
799 session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
800 })
801 }
802
803 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
805 where
806 E: EntityValue,
807 {
808 self.execute_terminal(MinIdTerminal::new())
809 }
810
811 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
813 where
814 E: EntityValue,
815 {
816 self.explain_terminal(&MinIdTerminal::new())
817 }
818
819 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
823 where
824 E: EntityValue,
825 {
826 let target_slot = self.resolve_non_paged_slot(field)?;
827
828 self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
829 }
830
831 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
833 where
834 E: EntityValue,
835 {
836 self.execute_terminal(MaxIdTerminal::new())
837 }
838
839 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
841 where
842 E: EntityValue,
843 {
844 self.explain_terminal(&MaxIdTerminal::new())
845 }
846
847 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
851 where
852 E: EntityValue,
853 {
854 let target_slot = self.resolve_non_paged_slot(field)?;
855
856 self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
857 }
858
859 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
862 where
863 E: EntityValue,
864 {
865 let target_slot = self.resolve_non_paged_slot(field)?;
866
867 self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
868 }
869
870 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
872 where
873 E: EntityValue,
874 {
875 let target_slot = self.resolve_non_paged_slot(field)?;
876
877 self.execute_terminal(SumBySlotTerminal::new(target_slot))
878 }
879
880 pub fn explain_sum_by(
882 &self,
883 field: impl AsRef<str>,
884 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
885 where
886 E: EntityValue,
887 {
888 let target_slot = self.resolve_non_paged_slot(field)?;
889
890 self.explain_terminal(&SumBySlotTerminal::new(target_slot))
891 }
892
893 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
895 where
896 E: EntityValue,
897 {
898 let target_slot = self.resolve_non_paged_slot(field)?;
899
900 self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
901 }
902
903 pub fn explain_sum_distinct_by(
905 &self,
906 field: impl AsRef<str>,
907 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
908 where
909 E: EntityValue,
910 {
911 let target_slot = self.resolve_non_paged_slot(field)?;
912
913 self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
914 }
915
916 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
918 where
919 E: EntityValue,
920 {
921 let target_slot = self.resolve_non_paged_slot(field)?;
922
923 self.execute_terminal(AvgBySlotTerminal::new(target_slot))
924 }
925
926 pub fn explain_avg_by(
928 &self,
929 field: impl AsRef<str>,
930 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
931 where
932 E: EntityValue,
933 {
934 let target_slot = self.resolve_non_paged_slot(field)?;
935
936 self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
937 }
938
939 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
941 where
942 E: EntityValue,
943 {
944 let target_slot = self.resolve_non_paged_slot(field)?;
945
946 self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
947 }
948
949 pub fn explain_avg_distinct_by(
951 &self,
952 field: impl AsRef<str>,
953 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
954 where
955 E: EntityValue,
956 {
957 let target_slot = self.resolve_non_paged_slot(field)?;
958
959 self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
960 }
961
962 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
967 where
968 E: EntityValue,
969 {
970 let target_slot = self.resolve_non_paged_slot(field)?;
971
972 self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
973 }
974
975 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
978 where
979 E: EntityValue,
980 {
981 let target_slot = self.resolve_non_paged_slot(field)?;
982
983 self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
984 }
985
986 pub fn explain_count_distinct_by(
988 &self,
989 field: impl AsRef<str>,
990 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
991 where
992 E: EntityValue,
993 {
994 let target_slot = self.resolve_non_paged_slot(field)?;
995
996 self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
997 }
998
999 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
1003 where
1004 E: EntityValue,
1005 {
1006 let target_slot = self.resolve_non_paged_slot(field)?;
1007
1008 self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
1009 }
1010
1011 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
1013 where
1014 E: EntityValue,
1015 {
1016 let target_slot = self.resolve_non_paged_slot(field)?;
1017
1018 self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
1019 .map(output_values)
1020 }
1021
1022 pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
1025 where
1026 E: EntityValue,
1027 P: ValueProjectionExpr,
1028 {
1029 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1030 let values = self.execute_terminal(ValuesBySlotTerminal::new(target_slot))?;
1031
1032 Self::project_terminal_items(projection, values, |projection, value| {
1033 projection.apply_value(value)
1034 })
1035 .map(output_values)
1036 }
1037
1038 pub fn explain_project_values<P>(
1040 &self,
1041 projection: &P,
1042 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1043 where
1044 E: EntityValue,
1045 P: ValueProjectionExpr,
1046 {
1047 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1048
1049 self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
1050 }
1051
1052 pub fn explain_values_by(
1054 &self,
1055 field: impl AsRef<str>,
1056 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1057 where
1058 E: EntityValue,
1059 {
1060 let target_slot = self.resolve_non_paged_slot(field)?;
1061
1062 self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
1063 }
1064
1065 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
1067 where
1068 E: EntityValue,
1069 {
1070 self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
1071 }
1072
1073 pub fn top_k_by(
1081 &self,
1082 field: impl AsRef<str>,
1083 take_count: u32,
1084 ) -> Result<EntityResponse<E>, QueryError>
1085 where
1086 E: EntityValue,
1087 {
1088 let target_slot = self.resolve_non_paged_slot(field)?;
1089
1090 self.with_non_paged(|session, query| {
1091 session.execute_fluent_ranked_rows_by_slot(query, target_slot, take_count, true)
1092 })
1093 }
1094
1095 pub fn bottom_k_by(
1103 &self,
1104 field: impl AsRef<str>,
1105 take_count: u32,
1106 ) -> Result<EntityResponse<E>, QueryError>
1107 where
1108 E: EntityValue,
1109 {
1110 let target_slot = self.resolve_non_paged_slot(field)?;
1111
1112 self.with_non_paged(|session, query| {
1113 session.execute_fluent_ranked_rows_by_slot(query, target_slot, take_count, false)
1114 })
1115 }
1116
1117 pub fn top_k_by_values(
1125 &self,
1126 field: impl AsRef<str>,
1127 take_count: u32,
1128 ) -> Result<Vec<OutputValue>, QueryError>
1129 where
1130 E: EntityValue,
1131 {
1132 let target_slot = self.resolve_non_paged_slot(field)?;
1133
1134 self.with_non_paged(|session, query| {
1135 session
1136 .execute_fluent_ranked_values_by_slot(query, target_slot, take_count, true)
1137 .map(output_values)
1138 })
1139 }
1140
1141 pub fn bottom_k_by_values(
1149 &self,
1150 field: impl AsRef<str>,
1151 take_count: u32,
1152 ) -> Result<Vec<OutputValue>, QueryError>
1153 where
1154 E: EntityValue,
1155 {
1156 let target_slot = self.resolve_non_paged_slot(field)?;
1157
1158 self.with_non_paged(|session, query| {
1159 session
1160 .execute_fluent_ranked_values_by_slot(query, target_slot, take_count, false)
1161 .map(output_values)
1162 })
1163 }
1164
1165 pub fn top_k_by_with_ids(
1173 &self,
1174 field: impl AsRef<str>,
1175 take_count: u32,
1176 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
1177 where
1178 E: EntityValue,
1179 {
1180 let target_slot = self.resolve_non_paged_slot(field)?;
1181
1182 self.with_non_paged(|session, query| {
1183 session
1184 .execute_fluent_ranked_values_with_ids_by_slot(query, target_slot, take_count, true)
1185 .map(output_values_with_ids)
1186 })
1187 }
1188
1189 pub fn bottom_k_by_with_ids(
1197 &self,
1198 field: impl AsRef<str>,
1199 take_count: u32,
1200 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
1201 where
1202 E: EntityValue,
1203 {
1204 let target_slot = self.resolve_non_paged_slot(field)?;
1205
1206 self.with_non_paged(|session, query| {
1207 session
1208 .execute_fluent_ranked_values_with_ids_by_slot(
1209 query,
1210 target_slot,
1211 take_count,
1212 false,
1213 )
1214 .map(output_values_with_ids)
1215 })
1216 }
1217
1218 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
1221 where
1222 E: EntityValue,
1223 {
1224 let target_slot = self.resolve_non_paged_slot(field)?;
1225
1226 self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
1227 .map(output_values)
1228 }
1229
1230 pub fn explain_distinct_values_by(
1232 &self,
1233 field: impl AsRef<str>,
1234 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1235 where
1236 E: EntityValue,
1237 {
1238 let target_slot = self.resolve_non_paged_slot(field)?;
1239
1240 self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
1241 }
1242
1243 pub fn values_by_with_ids(
1246 &self,
1247 field: impl AsRef<str>,
1248 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
1249 where
1250 E: EntityValue,
1251 {
1252 let target_slot = self.resolve_non_paged_slot(field)?;
1253
1254 self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
1255 .map(output_values_with_ids)
1256 }
1257
1258 pub fn project_values_with_ids<P>(
1261 &self,
1262 projection: &P,
1263 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
1264 where
1265 E: EntityValue,
1266 P: ValueProjectionExpr,
1267 {
1268 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1269 let values = self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))?;
1270
1271 Self::project_terminal_items(projection, values, |projection, (id, value)| {
1272 Ok((id, projection.apply_value(value)?))
1273 })
1274 .map(output_values_with_ids)
1275 }
1276
1277 pub fn explain_values_by_with_ids(
1279 &self,
1280 field: impl AsRef<str>,
1281 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1282 where
1283 E: EntityValue,
1284 {
1285 let target_slot = self.resolve_non_paged_slot(field)?;
1286
1287 self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
1288 }
1289
1290 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1293 where
1294 E: EntityValue,
1295 {
1296 let target_slot = self.resolve_non_paged_slot(field)?;
1297
1298 self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
1299 .map(|value| value.map(output))
1300 }
1301
1302 pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1305 where
1306 E: EntityValue,
1307 P: ValueProjectionExpr,
1308 {
1309 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1310 let value = self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))?;
1311
1312 let mut projected =
1313 Self::project_terminal_items(projection, value, |projection, value| {
1314 projection.apply_value(value)
1315 })?;
1316
1317 Ok(projected.pop().map(output))
1318 }
1319
1320 pub fn explain_first_value_by(
1322 &self,
1323 field: impl AsRef<str>,
1324 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1325 where
1326 E: EntityValue,
1327 {
1328 let target_slot = self.resolve_non_paged_slot(field)?;
1329
1330 self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
1331 }
1332
1333 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1336 where
1337 E: EntityValue,
1338 {
1339 let target_slot = self.resolve_non_paged_slot(field)?;
1340
1341 self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
1342 .map(|value| value.map(output))
1343 }
1344
1345 pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1348 where
1349 E: EntityValue,
1350 P: ValueProjectionExpr,
1351 {
1352 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1353 let value = self.execute_terminal(LastValueBySlotTerminal::new(target_slot))?;
1354
1355 let mut projected =
1356 Self::project_terminal_items(projection, value, |projection, value| {
1357 projection.apply_value(value)
1358 })?;
1359
1360 Ok(projected.pop().map(output))
1361 }
1362
1363 pub fn explain_last_value_by(
1365 &self,
1366 field: impl AsRef<str>,
1367 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1368 where
1369 E: EntityValue,
1370 {
1371 let target_slot = self.resolve_non_paged_slot(field)?;
1372
1373 self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1374 }
1375
1376 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1378 where
1379 E: EntityValue,
1380 {
1381 self.execute_terminal(FirstIdTerminal::new())
1382 }
1383
1384 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1386 where
1387 E: EntityValue,
1388 {
1389 self.explain_terminal(&FirstIdTerminal::new())
1390 }
1391
1392 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1394 where
1395 E: EntityValue,
1396 {
1397 self.execute_terminal(LastIdTerminal::new())
1398 }
1399
1400 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1402 where
1403 E: EntityValue,
1404 {
1405 self.explain_terminal(&LastIdTerminal::new())
1406 }
1407
1408 pub fn require_one(&self) -> Result<(), QueryError>
1410 where
1411 E: EntityValue,
1412 {
1413 self.execute()?.into_rows()?.require_one()?;
1414 Ok(())
1415 }
1416
1417 pub fn require_some(&self) -> Result<(), QueryError>
1419 where
1420 E: EntityValue,
1421 {
1422 self.execute()?.into_rows()?.require_some()?;
1423 Ok(())
1424 }
1425}