1#[cfg(feature = "diagnostics")]
16use crate::db::FluentTerminalExecutionAttribution;
17use crate::{
18 db::{
19 DbSession, PersistedRow, Query,
20 query::{
21 api::ResponseCardinalityExt,
22 builder::{
23 AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
24 CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
25 FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
26 MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
27 MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
28 SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
29 ValuesBySlotWithIdsTerminal,
30 },
31 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
32 fluent::load::{FluentLoadQuery, LoadQueryResult},
33 intent::QueryError,
34 plan::AggregateKind,
35 },
36 response::EntityResponse,
37 },
38 traits::EntityValue,
39 types::{Decimal, Id},
40 value::{OutputValue, Value},
41};
42
43type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
44
45trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
55 type Output;
56 type ExplainOutput;
57
58 fn execute(
59 self,
60 session: &DbSession<E::Canister>,
61 query: &Query<E>,
62 ) -> Result<Self::Output, QueryError>;
63
64 fn explain(
65 &self,
66 session: &DbSession<E::Canister>,
67 query: &Query<E>,
68 ) -> Result<Self::ExplainOutput, QueryError>;
69}
70
71macro_rules! impl_aggregate_terminal_driver {
75 ($terminal:ty, $output:ty, $execute:ident) => {
76 impl<E> TerminalStrategyDriver<E> for $terminal
77 where
78 E: PersistedRow + EntityValue,
79 {
80 type Output = $output;
81 type ExplainOutput = ExplainAggregateTerminalPlan;
82
83 fn execute(
84 self,
85 session: &DbSession<E::Canister>,
86 query: &Query<E>,
87 ) -> Result<Self::Output, QueryError> {
88 session.$execute(query, self)
89 }
90
91 fn explain(
92 &self,
93 session: &DbSession<E::Canister>,
94 query: &Query<E>,
95 ) -> Result<Self::ExplainOutput, QueryError> {
96 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
97 }
98 }
99 };
100}
101
102macro_rules! impl_projection_terminal_driver {
106 ($terminal:ty, $output:ty, $execute:ident) => {
107 impl<E> TerminalStrategyDriver<E> for $terminal
108 where
109 E: PersistedRow + EntityValue,
110 {
111 type Output = $output;
112 type ExplainOutput = ExplainExecutionNodeDescriptor;
113
114 fn execute(
115 self,
116 session: &DbSession<E::Canister>,
117 query: &Query<E>,
118 ) -> Result<Self::Output, QueryError> {
119 session.$execute(query, self)
120 }
121
122 fn explain(
123 &self,
124 session: &DbSession<E::Canister>,
125 query: &Query<E>,
126 ) -> Result<Self::ExplainOutput, QueryError> {
127 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
128 }
129 }
130 };
131}
132
133impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
134impl_aggregate_terminal_driver!(
135 ExistsRowsTerminal,
136 bool,
137 execute_fluent_exists_rows_terminal
138);
139impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
140impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
141impl_aggregate_terminal_driver!(
142 MinIdBySlotTerminal,
143 Option<Id<E>>,
144 execute_fluent_min_id_by_slot
145);
146impl_aggregate_terminal_driver!(
147 MaxIdBySlotTerminal,
148 Option<Id<E>>,
149 execute_fluent_max_id_by_slot
150);
151impl_aggregate_terminal_driver!(
152 SumBySlotTerminal,
153 Option<Decimal>,
154 execute_fluent_sum_by_slot
155);
156impl_aggregate_terminal_driver!(
157 SumDistinctBySlotTerminal,
158 Option<Decimal>,
159 execute_fluent_sum_distinct_by_slot
160);
161impl_aggregate_terminal_driver!(
162 AvgBySlotTerminal,
163 Option<Decimal>,
164 execute_fluent_avg_by_slot
165);
166impl_aggregate_terminal_driver!(
167 AvgDistinctBySlotTerminal,
168 Option<Decimal>,
169 execute_fluent_avg_distinct_by_slot
170);
171impl_aggregate_terminal_driver!(
172 FirstIdTerminal,
173 Option<Id<E>>,
174 execute_fluent_first_id_terminal
175);
176impl_aggregate_terminal_driver!(
177 LastIdTerminal,
178 Option<Id<E>>,
179 execute_fluent_last_id_terminal
180);
181impl_aggregate_terminal_driver!(
182 NthIdBySlotTerminal,
183 Option<Id<E>>,
184 execute_fluent_nth_id_by_slot
185);
186impl_aggregate_terminal_driver!(
187 MedianIdBySlotTerminal,
188 Option<Id<E>>,
189 execute_fluent_median_id_by_slot
190);
191impl_aggregate_terminal_driver!(
192 MinMaxIdBySlotTerminal,
193 MinMaxByIds<E>,
194 execute_fluent_min_max_id_by_slot
195);
196
197impl_projection_terminal_driver!(
198 ValuesBySlotTerminal,
199 Vec<Value>,
200 execute_fluent_values_by_slot
201);
202impl_projection_terminal_driver!(
203 DistinctValuesBySlotTerminal,
204 Vec<Value>,
205 execute_fluent_distinct_values_by_slot
206);
207impl_projection_terminal_driver!(
208 CountDistinctBySlotTerminal,
209 u32,
210 execute_fluent_count_distinct_by_slot
211);
212impl_projection_terminal_driver!(
213 ValuesBySlotWithIdsTerminal,
214 Vec<(Id<E>, Value)>,
215 execute_fluent_values_by_with_ids_slot
216);
217impl_projection_terminal_driver!(
218 FirstValueBySlotTerminal,
219 Option<Value>,
220 execute_fluent_first_value_by_slot
221);
222impl_projection_terminal_driver!(
223 LastValueBySlotTerminal,
224 Option<Value>,
225 execute_fluent_last_value_by_slot
226);
227
228fn output(value: Value) -> OutputValue {
230 OutputValue::from(value)
231}
232
233fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
235 values.into_iter().map(output).collect()
236}
237
238fn output_values_with_ids<E: PersistedRow>(
240 values: Vec<(Id<E>, Value)>,
241) -> Vec<(Id<E>, OutputValue)> {
242 values
243 .into_iter()
244 .map(|(id, value)| (id, output(value)))
245 .collect()
246}
247
248impl<E> FluentLoadQuery<'_, E>
249where
250 E: PersistedRow,
251{
252 pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
258 where
259 E: EntityValue,
260 {
261 self.with_admitted_non_paged(DbSession::execute_query_result)
262 }
263
264 pub fn execute_trusted(&self) -> Result<LoadQueryResult<E>, QueryError>
270 where
271 E: EntityValue,
272 {
273 self.with_non_paged(DbSession::execute_query_result)
274 }
275
276 pub fn execute_rows(&self) -> Result<EntityResponse<E>, QueryError>
278 where
279 E: EntityValue,
280 {
281 self.with_admitted_non_paged(DbSession::execute_scalar_query_rows)
282 }
283
284 pub fn execute_rows_trusted(&self) -> Result<EntityResponse<E>, QueryError>
290 where
291 E: EntityValue,
292 {
293 self.with_non_paged(DbSession::execute_scalar_query_rows)
294 }
295
296 fn with_admitted_non_paged<T>(
297 &self,
298 map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
299 ) -> Result<T, QueryError>
300 where
301 E: EntityValue,
302 {
303 self.ensure_non_paged_mode_ready()?;
304 self.ensure_default_read_admission()?;
305 map(self.session, self.query())
306 }
307
308 fn with_non_paged<T>(
311 &self,
312 map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
313 ) -> Result<T, QueryError>
314 where
315 E: EntityValue,
316 {
317 self.ensure_non_paged_mode_ready()?;
318 map(self.session, self.query())
319 }
320
321 fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
324 where
325 E: EntityValue,
326 {
327 self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
328 }
329
330 fn render_execution_descriptor(
333 &self,
334 render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
335 ) -> Result<String, QueryError>
336 where
337 E: EntityValue,
338 {
339 let descriptor = self.explain_execution_descriptor()?;
340
341 Ok(render(descriptor))
342 }
343
344 fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
347 where
348 E: EntityValue,
349 S: TerminalStrategyDriver<E>,
350 {
351 self.with_admitted_non_paged(|session, query| strategy.execute(session, query))
352 }
353
354 fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
357 where
358 E: EntityValue,
359 S: TerminalStrategyDriver<E>,
360 {
361 self.with_non_paged(|session, query| strategy.explain(session, query))
362 }
363
364 pub fn is_empty(&self) -> Result<bool, QueryError>
370 where
371 E: EntityValue,
372 {
373 self.not_exists()
374 }
375
376 pub fn not_exists(&self) -> Result<bool, QueryError>
378 where
379 E: EntityValue,
380 {
381 Ok(!self.exists()?)
382 }
383
384 pub fn exists(&self) -> Result<bool, QueryError>
386 where
387 E: EntityValue,
388 {
389 self.execute_terminal(ExistsRowsTerminal::new())
390 }
391
392 #[cfg(feature = "diagnostics")]
395 #[doc(hidden)]
396 pub fn exists_with_attribution(
397 &self,
398 ) -> Result<(bool, FluentTerminalExecutionAttribution), QueryError>
399 where
400 E: EntityValue,
401 {
402 self.with_admitted_non_paged(|session, query| {
403 session.execute_fluent_exists_rows_terminal_with_attribution(
404 query,
405 ExistsRowsTerminal::new(),
406 )
407 })
408 }
409
410 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
412 where
413 E: EntityValue,
414 {
415 self.explain_terminal(&ExistsRowsTerminal::new())
416 }
417
418 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
422 where
423 E: EntityValue,
424 {
425 self.explain_exists()
426 }
427
428 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
430 where
431 E: EntityValue,
432 {
433 self.explain_execution_descriptor()
434 }
435
436 pub fn explain_execution_text(&self) -> Result<String, QueryError>
438 where
439 E: EntityValue,
440 {
441 self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
442 }
443
444 pub fn explain_execution_json(&self) -> Result<String, QueryError>
446 where
447 E: EntityValue,
448 {
449 self.with_non_paged(DbSession::explain_query_execution_json_with_visible_indexes)
450 }
451
452 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
454 where
455 E: EntityValue,
456 {
457 self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
458 }
459
460 pub fn count(&self) -> Result<u32, QueryError>
462 where
463 E: EntityValue,
464 {
465 self.execute_terminal(CountRowsTerminal::new())
466 }
467
468 #[cfg(feature = "diagnostics")]
470 #[doc(hidden)]
471 pub fn count_with_attribution(
472 &self,
473 ) -> Result<(u32, FluentTerminalExecutionAttribution), QueryError>
474 where
475 E: EntityValue,
476 {
477 self.with_admitted_non_paged(|session, query| {
478 session.execute_fluent_count_rows_terminal_with_attribution(
479 query,
480 CountRowsTerminal::new(),
481 )
482 })
483 }
484
485 pub fn bytes(&self) -> Result<u64, QueryError>
488 where
489 E: EntityValue,
490 {
491 self.with_admitted_non_paged(DbSession::execute_fluent_bytes)
492 }
493
494 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
497 where
498 E: EntityValue,
499 {
500 let target_slot = self.resolve_non_paged_slot(field)?;
501
502 self.with_admitted_non_paged(|session, query| {
503 session.execute_fluent_bytes_by_slot(query, target_slot)
504 })
505 }
506
507 pub fn explain_bytes_by(
509 &self,
510 field: impl AsRef<str>,
511 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
512 where
513 E: EntityValue,
514 {
515 let target_slot = self.resolve_non_paged_slot(field)?;
516
517 self.with_non_paged(|session, query| {
518 session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
519 })
520 }
521
522 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
524 where
525 E: EntityValue,
526 {
527 self.execute_terminal(MinIdTerminal::new())
528 }
529
530 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
532 where
533 E: EntityValue,
534 {
535 self.explain_terminal(&MinIdTerminal::new())
536 }
537
538 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
542 where
543 E: EntityValue,
544 {
545 let target_slot = self.resolve_non_paged_slot(field)?;
546
547 self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
548 }
549
550 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
552 where
553 E: EntityValue,
554 {
555 self.execute_terminal(MaxIdTerminal::new())
556 }
557
558 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
560 where
561 E: EntityValue,
562 {
563 self.explain_terminal(&MaxIdTerminal::new())
564 }
565
566 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
570 where
571 E: EntityValue,
572 {
573 let target_slot = self.resolve_non_paged_slot(field)?;
574
575 self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
576 }
577
578 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
581 where
582 E: EntityValue,
583 {
584 let target_slot = self.resolve_non_paged_slot(field)?;
585
586 self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
587 }
588
589 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
591 where
592 E: EntityValue,
593 {
594 let target_slot = self.resolve_non_paged_slot(field)?;
595
596 self.execute_terminal(SumBySlotTerminal::new(target_slot))
597 }
598
599 pub fn explain_sum_by(
601 &self,
602 field: impl AsRef<str>,
603 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
604 where
605 E: EntityValue,
606 {
607 let target_slot = self.resolve_non_paged_slot(field)?;
608
609 self.explain_terminal(&SumBySlotTerminal::new(target_slot))
610 }
611
612 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
614 where
615 E: EntityValue,
616 {
617 let target_slot = self.resolve_non_paged_slot(field)?;
618
619 self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
620 }
621
622 pub fn explain_sum_distinct_by(
624 &self,
625 field: impl AsRef<str>,
626 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
627 where
628 E: EntityValue,
629 {
630 let target_slot = self.resolve_non_paged_slot(field)?;
631
632 self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
633 }
634
635 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
637 where
638 E: EntityValue,
639 {
640 let target_slot = self.resolve_non_paged_slot(field)?;
641
642 self.execute_terminal(AvgBySlotTerminal::new(target_slot))
643 }
644
645 pub fn explain_avg_by(
647 &self,
648 field: impl AsRef<str>,
649 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
650 where
651 E: EntityValue,
652 {
653 let target_slot = self.resolve_non_paged_slot(field)?;
654
655 self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
656 }
657
658 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
660 where
661 E: EntityValue,
662 {
663 let target_slot = self.resolve_non_paged_slot(field)?;
664
665 self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
666 }
667
668 pub fn explain_avg_distinct_by(
670 &self,
671 field: impl AsRef<str>,
672 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
673 where
674 E: EntityValue,
675 {
676 let target_slot = self.resolve_non_paged_slot(field)?;
677
678 self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
679 }
680
681 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
686 where
687 E: EntityValue,
688 {
689 let target_slot = self.resolve_non_paged_slot(field)?;
690
691 self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
692 }
693
694 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
697 where
698 E: EntityValue,
699 {
700 let target_slot = self.resolve_non_paged_slot(field)?;
701
702 self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
703 }
704
705 pub fn explain_count_distinct_by(
707 &self,
708 field: impl AsRef<str>,
709 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
710 where
711 E: EntityValue,
712 {
713 let target_slot = self.resolve_non_paged_slot(field)?;
714
715 self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
716 }
717
718 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
722 where
723 E: EntityValue,
724 {
725 let target_slot = self.resolve_non_paged_slot(field)?;
726
727 self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
728 }
729
730 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
732 where
733 E: EntityValue,
734 {
735 let target_slot = self.resolve_non_paged_slot(field)?;
736
737 self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
738 .map(output_values)
739 }
740
741 pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
744 where
745 E: EntityValue,
746 P: ValueProjectionExpr,
747 {
748 let target_slot = self.resolve_non_paged_slot(projection.field())?;
749
750 self.with_admitted_non_paged(|session, query| {
751 session.execute_fluent_project_values_by_slot(
752 query,
753 target_slot,
754 projection.projection_plan().into_expr(),
755 )
756 })
757 .map(output_values)
758 }
759
760 pub fn explain_project_values<P>(
762 &self,
763 projection: &P,
764 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
765 where
766 E: EntityValue,
767 P: ValueProjectionExpr,
768 {
769 let target_slot = self.resolve_non_paged_slot(projection.field())?;
770
771 self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
772 }
773
774 pub fn explain_values_by(
776 &self,
777 field: impl AsRef<str>,
778 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
779 where
780 E: EntityValue,
781 {
782 let target_slot = self.resolve_non_paged_slot(field)?;
783
784 self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
785 }
786
787 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
789 where
790 E: EntityValue,
791 {
792 self.with_admitted_non_paged(|session, query| {
793 session.execute_fluent_take(query, take_count)
794 })
795 }
796
797 pub fn top_k_by(
805 &self,
806 field: impl AsRef<str>,
807 take_count: u32,
808 ) -> Result<EntityResponse<E>, QueryError>
809 where
810 E: EntityValue,
811 {
812 let target_slot = self.resolve_non_paged_slot(field)?;
813
814 self.with_admitted_non_paged(|session, query| {
815 session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
816 })
817 }
818
819 pub fn bottom_k_by(
827 &self,
828 field: impl AsRef<str>,
829 take_count: u32,
830 ) -> Result<EntityResponse<E>, QueryError>
831 where
832 E: EntityValue,
833 {
834 let target_slot = self.resolve_non_paged_slot(field)?;
835
836 self.with_admitted_non_paged(|session, query| {
837 session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
838 })
839 }
840
841 pub fn top_k_by_values(
849 &self,
850 field: impl AsRef<str>,
851 take_count: u32,
852 ) -> Result<Vec<OutputValue>, QueryError>
853 where
854 E: EntityValue,
855 {
856 let target_slot = self.resolve_non_paged_slot(field)?;
857
858 self.with_admitted_non_paged(|session, query| {
859 session
860 .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
861 .map(output_values)
862 })
863 }
864
865 pub fn bottom_k_by_values(
873 &self,
874 field: impl AsRef<str>,
875 take_count: u32,
876 ) -> Result<Vec<OutputValue>, QueryError>
877 where
878 E: EntityValue,
879 {
880 let target_slot = self.resolve_non_paged_slot(field)?;
881
882 self.with_admitted_non_paged(|session, query| {
883 session
884 .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
885 .map(output_values)
886 })
887 }
888
889 pub fn top_k_by_with_ids(
897 &self,
898 field: impl AsRef<str>,
899 take_count: u32,
900 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
901 where
902 E: EntityValue,
903 {
904 let target_slot = self.resolve_non_paged_slot(field)?;
905
906 self.with_admitted_non_paged(|session, query| {
907 session
908 .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
909 .map(output_values_with_ids)
910 })
911 }
912
913 pub fn bottom_k_by_with_ids(
921 &self,
922 field: impl AsRef<str>,
923 take_count: u32,
924 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
925 where
926 E: EntityValue,
927 {
928 let target_slot = self.resolve_non_paged_slot(field)?;
929
930 self.with_admitted_non_paged(|session, query| {
931 session
932 .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
933 .map(output_values_with_ids)
934 })
935 }
936
937 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
940 where
941 E: EntityValue,
942 {
943 let target_slot = self.resolve_non_paged_slot(field)?;
944
945 self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
946 .map(output_values)
947 }
948
949 pub fn explain_distinct_values_by(
951 &self,
952 field: impl AsRef<str>,
953 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
954 where
955 E: EntityValue,
956 {
957 let target_slot = self.resolve_non_paged_slot(field)?;
958
959 self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
960 }
961
962 pub fn values_by_with_ids(
965 &self,
966 field: impl AsRef<str>,
967 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
968 where
969 E: EntityValue,
970 {
971 let target_slot = self.resolve_non_paged_slot(field)?;
972
973 self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
974 .map(output_values_with_ids)
975 }
976
977 pub fn project_values_with_ids<P>(
980 &self,
981 projection: &P,
982 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
983 where
984 E: EntityValue,
985 P: ValueProjectionExpr,
986 {
987 let target_slot = self.resolve_non_paged_slot(projection.field())?;
988
989 self.with_admitted_non_paged(|session, query| {
990 session.execute_fluent_project_values_with_ids_by_slot(
991 query,
992 target_slot,
993 projection.projection_plan().into_expr(),
994 )
995 })
996 .map(output_values_with_ids)
997 }
998
999 pub fn explain_values_by_with_ids(
1001 &self,
1002 field: impl AsRef<str>,
1003 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1004 where
1005 E: EntityValue,
1006 {
1007 let target_slot = self.resolve_non_paged_slot(field)?;
1008
1009 self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
1010 }
1011
1012 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1015 where
1016 E: EntityValue,
1017 {
1018 let target_slot = self.resolve_non_paged_slot(field)?;
1019
1020 self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
1021 .map(|value| value.map(output))
1022 }
1023
1024 pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1027 where
1028 E: EntityValue,
1029 P: ValueProjectionExpr,
1030 {
1031 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1032
1033 self.with_admitted_non_paged(|session, query| {
1034 session.execute_fluent_project_terminal_value_by_slot(
1035 query,
1036 target_slot,
1037 AggregateKind::First,
1038 projection.projection_plan().into_expr(),
1039 )
1040 })
1041 .map(|value| value.map(output))
1042 }
1043
1044 pub fn explain_first_value_by(
1046 &self,
1047 field: impl AsRef<str>,
1048 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1049 where
1050 E: EntityValue,
1051 {
1052 let target_slot = self.resolve_non_paged_slot(field)?;
1053
1054 self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
1055 }
1056
1057 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1060 where
1061 E: EntityValue,
1062 {
1063 let target_slot = self.resolve_non_paged_slot(field)?;
1064
1065 self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
1066 .map(|value| value.map(output))
1067 }
1068
1069 pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1072 where
1073 E: EntityValue,
1074 P: ValueProjectionExpr,
1075 {
1076 let target_slot = self.resolve_non_paged_slot(projection.field())?;
1077
1078 self.with_admitted_non_paged(|session, query| {
1079 session.execute_fluent_project_terminal_value_by_slot(
1080 query,
1081 target_slot,
1082 AggregateKind::Last,
1083 projection.projection_plan().into_expr(),
1084 )
1085 })
1086 .map(|value| value.map(output))
1087 }
1088
1089 pub fn explain_last_value_by(
1091 &self,
1092 field: impl AsRef<str>,
1093 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1094 where
1095 E: EntityValue,
1096 {
1097 let target_slot = self.resolve_non_paged_slot(field)?;
1098
1099 self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1100 }
1101
1102 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1104 where
1105 E: EntityValue,
1106 {
1107 self.execute_terminal(FirstIdTerminal::new())
1108 }
1109
1110 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1112 where
1113 E: EntityValue,
1114 {
1115 self.explain_terminal(&FirstIdTerminal::new())
1116 }
1117
1118 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1120 where
1121 E: EntityValue,
1122 {
1123 self.execute_terminal(LastIdTerminal::new())
1124 }
1125
1126 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1128 where
1129 E: EntityValue,
1130 {
1131 self.explain_terminal(&LastIdTerminal::new())
1132 }
1133
1134 pub fn require_one(&self) -> Result<(), QueryError>
1136 where
1137 E: EntityValue,
1138 {
1139 self.execute_rows()?.require_one()?;
1140 Ok(())
1141 }
1142
1143 pub fn require_some(&self) -> Result<(), QueryError>
1145 where
1146 E: EntityValue,
1147 {
1148 self.execute_rows()?.require_some()?;
1149 Ok(())
1150 }
1151}