1use crate::{
7 db::{
8 PersistedRow,
9 executor::{
10 LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12 ScalarTerminalBoundaryRequest,
13 },
14 query::{
15 api::ResponseCardinalityExt,
16 builder::{
17 PreparedFluentAggregateExplainStrategy,
18 PreparedFluentExistingRowsTerminalRuntimeRequest,
19 PreparedFluentExistingRowsTerminalStrategy,
20 PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21 PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22 PreparedFluentOrderSensitiveTerminalStrategy,
23 PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24 PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25 ValueProjectionExpr,
26 },
27 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28 fluent::load::{FluentLoadQuery, LoadQueryResult},
29 intent::QueryError,
30 plan::AggregateKind,
31 },
32 response::EntityResponse,
33 },
34 error::InternalError,
35 traits::EntityValue,
36 types::{Decimal, Id},
37 value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44 E: PersistedRow,
45{
46 pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52 where
53 E: EntityValue,
54 {
55 self.ensure_non_paged_mode_ready()?;
56
57 if self.query().has_grouping() {
58 return self
59 .session
60 .execute_grouped(self.query(), self.cursor_token.as_deref())
61 .map(LoadQueryResult::Grouped);
62 }
63
64 self.session
65 .execute_query(self.query())
66 .map(LoadQueryResult::Rows)
67 }
68
69 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72 where
73 E: EntityValue,
74 F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75 {
76 self.ensure_non_paged_mode_ready()?;
77
78 self.session.execute_load_query_with(self.query(), execute)
79 }
80
81 fn explain_prepared_aggregate_non_paged_terminal<S>(
85 &self,
86 strategy: &S,
87 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88 where
89 E: EntityValue,
90 S: PreparedFluentAggregateExplainStrategy,
91 {
92 self.ensure_non_paged_mode_ready()?;
93
94 self.session
95 .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96 }
97
98 fn explain_prepared_projection_non_paged_terminal(
102 &self,
103 strategy: &PreparedFluentProjectionStrategy,
104 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105 where
106 E: EntityValue,
107 {
108 self.ensure_non_paged_mode_ready()?;
109
110 self.session
111 .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112 }
113
114 fn execute_prepared_scalar_terminal_output(
118 &self,
119 strategy: PreparedFluentScalarTerminalStrategy,
120 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
121 where
122 E: EntityValue,
123 {
124 let runtime_request = strategy.into_runtime_request();
125
126 self.execute_scalar_non_paged_terminal(move |load, plan| {
127 load.execute_scalar_terminal_request(
128 plan,
129 scalar_terminal_boundary_request_from_prepared(runtime_request),
130 )
131 })
132 }
133
134 fn execute_prepared_existing_rows_terminal_output(
138 &self,
139 strategy: PreparedFluentExistingRowsTerminalStrategy,
140 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
141 where
142 E: EntityValue,
143 {
144 let runtime_request = strategy.into_runtime_request();
145
146 self.execute_scalar_non_paged_terminal(move |load, plan| {
147 load.execute_scalar_terminal_request(
148 plan,
149 existing_rows_terminal_boundary_request_from_prepared(runtime_request),
150 )
151 })
152 }
153
154 fn execute_prepared_numeric_field_terminal(
158 &self,
159 strategy: PreparedFluentNumericFieldStrategy,
160 ) -> Result<Option<Decimal>, QueryError>
161 where
162 E: EntityValue,
163 {
164 let (target_field, runtime_request) = strategy.into_runtime_parts();
165
166 self.execute_scalar_non_paged_terminal(move |load, plan| {
167 load.execute_numeric_field_boundary(
168 plan,
169 target_field,
170 numeric_field_boundary_request_from_prepared(runtime_request),
171 )
172 })
173 }
174
175 fn execute_prepared_order_sensitive_terminal_output(
179 &self,
180 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
181 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
182 where
183 E: EntityValue,
184 {
185 let runtime_request = strategy.into_runtime_request();
186
187 self.execute_scalar_non_paged_terminal(move |load, plan| {
188 load.execute_scalar_terminal_request(
189 plan,
190 order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
191 )
192 })
193 }
194
195 fn execute_prepared_projection_terminal_output(
199 &self,
200 strategy: PreparedFluentProjectionStrategy,
201 ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
202 where
203 E: EntityValue,
204 {
205 let (target_field, runtime_request) = strategy.into_runtime_parts();
206
207 self.execute_scalar_non_paged_terminal(move |load, plan| {
208 load.execute_scalar_projection_boundary(
209 plan,
210 target_field,
211 projection_boundary_request_from_prepared(runtime_request),
212 )
213 })
214 }
215
216 fn project_terminal_items<P, T, U>(
219 projection: &P,
220 values: impl IntoIterator<Item = T>,
221 mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
222 ) -> Result<Vec<U>, QueryError>
223 where
224 P: ValueProjectionExpr,
225 {
226 values
227 .into_iter()
228 .map(|value| map(projection, value))
229 .collect()
230 }
231
232 pub fn is_empty(&self) -> Result<bool, QueryError>
238 where
239 E: EntityValue,
240 {
241 self.not_exists()
242 }
243
244 pub fn not_exists(&self) -> Result<bool, QueryError>
246 where
247 E: EntityValue,
248 {
249 Ok(!self.exists()?)
250 }
251
252 pub fn exists(&self) -> Result<bool, QueryError>
254 where
255 E: EntityValue,
256 {
257 self.execute_prepared_existing_rows_terminal_output(
258 PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
259 )?
260 .into_exists()
261 .map_err(QueryError::execute)
262 }
263
264 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
266 where
267 E: EntityValue,
268 {
269 self.explain_prepared_aggregate_non_paged_terminal(
270 &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
271 )
272 }
273
274 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
278 where
279 E: EntityValue,
280 {
281 self.explain_exists()
282 }
283
284 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
286 where
287 E: EntityValue,
288 {
289 self.session
290 .explain_query_execution_with_visible_indexes(self.query())
291 }
292
293 pub fn explain_execution_text(&self) -> Result<String, QueryError>
295 where
296 E: EntityValue,
297 {
298 self.session
299 .explain_query_execution_text_with_visible_indexes(self.query())
300 }
301
302 pub fn explain_execution_json(&self) -> Result<String, QueryError>
304 where
305 E: EntityValue,
306 {
307 self.session
308 .explain_query_execution_json_with_visible_indexes(self.query())
309 }
310
311 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
313 where
314 E: EntityValue,
315 {
316 self.session
317 .explain_query_execution_verbose_with_visible_indexes(self.query())
318 }
319
320 pub fn count(&self) -> Result<u32, QueryError>
322 where
323 E: EntityValue,
324 {
325 self.execute_prepared_existing_rows_terminal_output(
326 PreparedFluentExistingRowsTerminalStrategy::count_rows(),
327 )?
328 .into_count()
329 .map_err(QueryError::execute)
330 }
331
332 pub fn bytes(&self) -> Result<u64, QueryError>
335 where
336 E: EntityValue,
337 {
338 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
339 }
340
341 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
344 where
345 E: EntityValue,
346 {
347 self.ensure_non_paged_mode_ready()?;
348
349 Self::with_slot(field, |target_slot| {
350 self.session
351 .execute_load_query_with(self.query(), move |load, plan| {
352 load.bytes_by_slot(plan, target_slot)
353 })
354 })
355 }
356
357 pub fn explain_bytes_by(
359 &self,
360 field: impl AsRef<str>,
361 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
362 where
363 E: EntityValue,
364 {
365 self.ensure_non_paged_mode_ready()?;
366
367 Self::with_slot(field, |target_slot| {
368 self.session
369 .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
370 })
371 }
372
373 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
375 where
376 E: EntityValue,
377 {
378 self.execute_prepared_scalar_terminal_output(
379 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
380 )?
381 .into_id()
382 .map_err(QueryError::execute)
383 }
384
385 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
387 where
388 E: EntityValue,
389 {
390 self.explain_prepared_aggregate_non_paged_terminal(
391 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
392 )
393 }
394
395 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
399 where
400 E: EntityValue,
401 {
402 self.ensure_non_paged_mode_ready()?;
403
404 Self::with_slot(field, |target_slot| {
405 self.execute_prepared_scalar_terminal_output(
406 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
407 )?
408 .into_id()
409 .map_err(QueryError::execute)
410 })
411 }
412
413 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
415 where
416 E: EntityValue,
417 {
418 self.execute_prepared_scalar_terminal_output(
419 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
420 )?
421 .into_id()
422 .map_err(QueryError::execute)
423 }
424
425 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
427 where
428 E: EntityValue,
429 {
430 self.explain_prepared_aggregate_non_paged_terminal(
431 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
432 )
433 }
434
435 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
439 where
440 E: EntityValue,
441 {
442 self.ensure_non_paged_mode_ready()?;
443
444 Self::with_slot(field, |target_slot| {
445 self.execute_prepared_scalar_terminal_output(
446 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
447 )?
448 .into_id()
449 .map_err(QueryError::execute)
450 })
451 }
452
453 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
456 where
457 E: EntityValue,
458 {
459 self.ensure_non_paged_mode_ready()?;
460
461 Self::with_slot(field, |target_slot| {
462 self.execute_prepared_order_sensitive_terminal_output(
463 PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
464 )?
465 .into_id()
466 .map_err(QueryError::execute)
467 })
468 }
469
470 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
472 where
473 E: EntityValue,
474 {
475 self.ensure_non_paged_mode_ready()?;
476
477 Self::with_slot(field, |target_slot| {
478 self.execute_prepared_numeric_field_terminal(
479 PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
480 )
481 })
482 }
483
484 pub fn explain_sum_by(
486 &self,
487 field: impl AsRef<str>,
488 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
489 where
490 E: EntityValue,
491 {
492 self.ensure_non_paged_mode_ready()?;
493
494 Self::with_slot(field, |target_slot| {
495 self.explain_prepared_aggregate_non_paged_terminal(
496 &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
497 )
498 })
499 }
500
501 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
503 where
504 E: EntityValue,
505 {
506 self.ensure_non_paged_mode_ready()?;
507
508 Self::with_slot(field, |target_slot| {
509 self.execute_prepared_numeric_field_terminal(
510 PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
511 )
512 })
513 }
514
515 pub fn explain_sum_distinct_by(
517 &self,
518 field: impl AsRef<str>,
519 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
520 where
521 E: EntityValue,
522 {
523 self.ensure_non_paged_mode_ready()?;
524
525 Self::with_slot(field, |target_slot| {
526 self.explain_prepared_aggregate_non_paged_terminal(
527 &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
528 )
529 })
530 }
531
532 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
534 where
535 E: EntityValue,
536 {
537 self.ensure_non_paged_mode_ready()?;
538
539 Self::with_slot(field, |target_slot| {
540 self.execute_prepared_numeric_field_terminal(
541 PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
542 )
543 })
544 }
545
546 pub fn explain_avg_by(
548 &self,
549 field: impl AsRef<str>,
550 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
551 where
552 E: EntityValue,
553 {
554 self.ensure_non_paged_mode_ready()?;
555
556 Self::with_slot(field, |target_slot| {
557 self.explain_prepared_aggregate_non_paged_terminal(
558 &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
559 )
560 })
561 }
562
563 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
565 where
566 E: EntityValue,
567 {
568 self.ensure_non_paged_mode_ready()?;
569
570 Self::with_slot(field, |target_slot| {
571 self.execute_prepared_numeric_field_terminal(
572 PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
573 )
574 })
575 }
576
577 pub fn explain_avg_distinct_by(
579 &self,
580 field: impl AsRef<str>,
581 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
582 where
583 E: EntityValue,
584 {
585 self.ensure_non_paged_mode_ready()?;
586
587 Self::with_slot(field, |target_slot| {
588 self.explain_prepared_aggregate_non_paged_terminal(
589 &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
590 )
591 })
592 }
593
594 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
599 where
600 E: EntityValue,
601 {
602 self.ensure_non_paged_mode_ready()?;
603
604 Self::with_slot(field, |target_slot| {
605 self.execute_prepared_order_sensitive_terminal_output(
606 PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
607 )?
608 .into_id()
609 .map_err(QueryError::execute)
610 })
611 }
612
613 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
616 where
617 E: EntityValue,
618 {
619 self.ensure_non_paged_mode_ready()?;
620
621 Self::with_slot(field, |target_slot| {
622 self.execute_prepared_projection_terminal_output(
623 PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
624 )?
625 .into_count()
626 .map_err(QueryError::execute)
627 })
628 }
629
630 pub fn explain_count_distinct_by(
632 &self,
633 field: impl AsRef<str>,
634 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
635 where
636 E: EntityValue,
637 {
638 self.ensure_non_paged_mode_ready()?;
639
640 Self::with_slot(field, |target_slot| {
641 self.explain_prepared_projection_non_paged_terminal(
642 &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
643 )
644 })
645 }
646
647 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
651 where
652 E: EntityValue,
653 {
654 self.ensure_non_paged_mode_ready()?;
655
656 Self::with_slot(field, |target_slot| {
657 self.execute_prepared_order_sensitive_terminal_output(
658 PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
659 )?
660 .into_id_pair()
661 .map_err(QueryError::execute)
662 })
663 }
664
665 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
667 where
668 E: EntityValue,
669 {
670 self.ensure_non_paged_mode_ready()?;
671
672 Self::with_slot(field, |target_slot| {
673 self.execute_prepared_projection_terminal_output(
674 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
675 )?
676 .into_values()
677 .map_err(QueryError::execute)
678 })
679 }
680
681 pub fn project_values<P>(&self, projection: &P) -> Result<Vec<Value>, QueryError>
684 where
685 E: EntityValue,
686 P: ValueProjectionExpr,
687 {
688 self.ensure_non_paged_mode_ready()?;
689
690 Self::with_slot(projection.field(), |target_slot| {
691 let values = self
692 .execute_prepared_projection_terminal_output(
693 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
694 )?
695 .into_values()
696 .map_err(QueryError::execute)?;
697
698 Self::project_terminal_items(projection, values, |projection, value| {
699 projection.apply_value(value)
700 })
701 })
702 }
703
704 pub fn explain_project_values<P>(
706 &self,
707 projection: &P,
708 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
709 where
710 E: EntityValue,
711 P: ValueProjectionExpr,
712 {
713 self.ensure_non_paged_mode_ready()?;
714
715 Self::with_slot(projection.field(), |target_slot| {
716 self.explain_prepared_projection_non_paged_terminal(
717 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
718 )
719 })
720 }
721
722 pub fn explain_values_by(
724 &self,
725 field: impl AsRef<str>,
726 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
727 where
728 E: EntityValue,
729 {
730 self.ensure_non_paged_mode_ready()?;
731
732 Self::with_slot(field, |target_slot| {
733 self.explain_prepared_projection_non_paged_terminal(
734 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
735 )
736 })
737 }
738
739 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
741 where
742 E: EntityValue,
743 {
744 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
745 }
746
747 pub fn top_k_by(
755 &self,
756 field: impl AsRef<str>,
757 take_count: u32,
758 ) -> Result<EntityResponse<E>, QueryError>
759 where
760 E: EntityValue,
761 {
762 self.ensure_non_paged_mode_ready()?;
763
764 Self::with_slot(field, |target_slot| {
765 self.session
766 .execute_load_query_with(self.query(), move |load, plan| {
767 load.top_k_by_slot(plan, target_slot, take_count)
768 })
769 })
770 }
771
772 pub fn bottom_k_by(
780 &self,
781 field: impl AsRef<str>,
782 take_count: u32,
783 ) -> Result<EntityResponse<E>, QueryError>
784 where
785 E: EntityValue,
786 {
787 self.ensure_non_paged_mode_ready()?;
788
789 Self::with_slot(field, |target_slot| {
790 self.session
791 .execute_load_query_with(self.query(), move |load, plan| {
792 load.bottom_k_by_slot(plan, target_slot, take_count)
793 })
794 })
795 }
796
797 pub fn top_k_by_values(
805 &self,
806 field: impl AsRef<str>,
807 take_count: u32,
808 ) -> Result<Vec<Value>, QueryError>
809 where
810 E: EntityValue,
811 {
812 self.ensure_non_paged_mode_ready()?;
813
814 Self::with_slot(field, |target_slot| {
815 self.session
816 .execute_load_query_with(self.query(), move |load, plan| {
817 load.top_k_by_values_slot(plan, target_slot, take_count)
818 })
819 })
820 }
821
822 pub fn bottom_k_by_values(
830 &self,
831 field: impl AsRef<str>,
832 take_count: u32,
833 ) -> Result<Vec<Value>, QueryError>
834 where
835 E: EntityValue,
836 {
837 self.ensure_non_paged_mode_ready()?;
838
839 Self::with_slot(field, |target_slot| {
840 self.session
841 .execute_load_query_with(self.query(), move |load, plan| {
842 load.bottom_k_by_values_slot(plan, target_slot, take_count)
843 })
844 })
845 }
846
847 pub fn top_k_by_with_ids(
855 &self,
856 field: impl AsRef<str>,
857 take_count: u32,
858 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
859 where
860 E: EntityValue,
861 {
862 self.ensure_non_paged_mode_ready()?;
863
864 Self::with_slot(field, |target_slot| {
865 self.session
866 .execute_load_query_with(self.query(), move |load, plan| {
867 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
868 })
869 })
870 }
871
872 pub fn bottom_k_by_with_ids(
880 &self,
881 field: impl AsRef<str>,
882 take_count: u32,
883 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
884 where
885 E: EntityValue,
886 {
887 self.ensure_non_paged_mode_ready()?;
888
889 Self::with_slot(field, |target_slot| {
890 self.session
891 .execute_load_query_with(self.query(), move |load, plan| {
892 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
893 })
894 })
895 }
896
897 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
900 where
901 E: EntityValue,
902 {
903 self.ensure_non_paged_mode_ready()?;
904
905 Self::with_slot(field, |target_slot| {
906 self.execute_prepared_projection_terminal_output(
907 PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
908 )?
909 .into_values()
910 .map_err(QueryError::execute)
911 })
912 }
913
914 pub fn explain_distinct_values_by(
916 &self,
917 field: impl AsRef<str>,
918 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
919 where
920 E: EntityValue,
921 {
922 self.ensure_non_paged_mode_ready()?;
923
924 Self::with_slot(field, |target_slot| {
925 self.explain_prepared_projection_non_paged_terminal(
926 &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
927 )
928 })
929 }
930
931 pub fn values_by_with_ids(
934 &self,
935 field: impl AsRef<str>,
936 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
937 where
938 E: EntityValue,
939 {
940 self.ensure_non_paged_mode_ready()?;
941
942 Self::with_slot(field, |target_slot| {
943 self.execute_prepared_projection_terminal_output(
944 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
945 )?
946 .into_values_with_ids()
947 .map_err(QueryError::execute)
948 })
949 }
950
951 pub fn project_values_with_ids<P>(
954 &self,
955 projection: &P,
956 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
957 where
958 E: EntityValue,
959 P: ValueProjectionExpr,
960 {
961 self.ensure_non_paged_mode_ready()?;
962
963 Self::with_slot(projection.field(), |target_slot| {
964 let values = self
965 .execute_prepared_projection_terminal_output(
966 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
967 )?
968 .into_values_with_ids::<E>()
969 .map_err(QueryError::execute)?;
970
971 Self::project_terminal_items(projection, values, |projection, (id, value)| {
972 Ok((id, projection.apply_value(value)?))
973 })
974 })
975 }
976
977 pub fn explain_values_by_with_ids(
979 &self,
980 field: impl AsRef<str>,
981 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
982 where
983 E: EntityValue,
984 {
985 self.ensure_non_paged_mode_ready()?;
986
987 Self::with_slot(field, |target_slot| {
988 self.explain_prepared_projection_non_paged_terminal(
989 &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
990 )
991 })
992 }
993
994 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
997 where
998 E: EntityValue,
999 {
1000 self.ensure_non_paged_mode_ready()?;
1001
1002 Self::with_slot(field, |target_slot| {
1003 self.execute_prepared_projection_terminal_output(
1004 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1005 )?
1006 .into_terminal_value()
1007 .map_err(QueryError::execute)
1008 })
1009 }
1010
1011 pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1014 where
1015 E: EntityValue,
1016 P: ValueProjectionExpr,
1017 {
1018 self.ensure_non_paged_mode_ready()?;
1019
1020 Self::with_slot(projection.field(), |target_slot| {
1021 let value = self
1022 .execute_prepared_projection_terminal_output(
1023 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1024 )?
1025 .into_terminal_value()
1026 .map_err(QueryError::execute)?;
1027
1028 let mut projected =
1029 Self::project_terminal_items(projection, value, |projection, value| {
1030 projection.apply_value(value)
1031 })?;
1032
1033 Ok(projected.pop())
1034 })
1035 }
1036
1037 pub fn explain_first_value_by(
1039 &self,
1040 field: impl AsRef<str>,
1041 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1042 where
1043 E: EntityValue,
1044 {
1045 self.ensure_non_paged_mode_ready()?;
1046
1047 Self::with_slot(field, |target_slot| {
1048 self.explain_prepared_projection_non_paged_terminal(
1049 &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1050 )
1051 })
1052 }
1053
1054 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1057 where
1058 E: EntityValue,
1059 {
1060 self.ensure_non_paged_mode_ready()?;
1061
1062 Self::with_slot(field, |target_slot| {
1063 self.execute_prepared_projection_terminal_output(
1064 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1065 )?
1066 .into_terminal_value()
1067 .map_err(QueryError::execute)
1068 })
1069 }
1070
1071 pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1074 where
1075 E: EntityValue,
1076 P: ValueProjectionExpr,
1077 {
1078 self.ensure_non_paged_mode_ready()?;
1079
1080 Self::with_slot(projection.field(), |target_slot| {
1081 let value = self
1082 .execute_prepared_projection_terminal_output(
1083 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1084 )?
1085 .into_terminal_value()
1086 .map_err(QueryError::execute)?;
1087
1088 let mut projected =
1089 Self::project_terminal_items(projection, value, |projection, value| {
1090 projection.apply_value(value)
1091 })?;
1092
1093 Ok(projected.pop())
1094 })
1095 }
1096
1097 pub fn explain_last_value_by(
1099 &self,
1100 field: impl AsRef<str>,
1101 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1102 where
1103 E: EntityValue,
1104 {
1105 self.ensure_non_paged_mode_ready()?;
1106
1107 Self::with_slot(field, |target_slot| {
1108 self.explain_prepared_projection_non_paged_terminal(
1109 &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1110 )
1111 })
1112 }
1113
1114 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1116 where
1117 E: EntityValue,
1118 {
1119 self.execute_prepared_order_sensitive_terminal_output(
1120 PreparedFluentOrderSensitiveTerminalStrategy::first(),
1121 )?
1122 .into_id()
1123 .map_err(QueryError::execute)
1124 }
1125
1126 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1128 where
1129 E: EntityValue,
1130 {
1131 self.explain_prepared_aggregate_non_paged_terminal(
1132 &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1133 )
1134 }
1135
1136 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1138 where
1139 E: EntityValue,
1140 {
1141 self.execute_prepared_order_sensitive_terminal_output(
1142 PreparedFluentOrderSensitiveTerminalStrategy::last(),
1143 )?
1144 .into_id()
1145 .map_err(QueryError::execute)
1146 }
1147
1148 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1150 where
1151 E: EntityValue,
1152 {
1153 self.explain_prepared_aggregate_non_paged_terminal(
1154 &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1155 )
1156 }
1157
1158 pub fn require_one(&self) -> Result<(), QueryError>
1160 where
1161 E: EntityValue,
1162 {
1163 self.execute()?.into_rows()?.require_one()?;
1164 Ok(())
1165 }
1166
1167 pub fn require_some(&self) -> Result<(), QueryError>
1169 where
1170 E: EntityValue,
1171 {
1172 self.execute()?.into_rows()?.require_some()?;
1173 Ok(())
1174 }
1175}
1176
1177fn scalar_terminal_boundary_request_from_prepared(
1178 request: PreparedFluentScalarTerminalRuntimeRequest,
1179) -> ScalarTerminalBoundaryRequest {
1180 match request {
1181 PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1182 ScalarTerminalBoundaryRequest::IdTerminal { kind }
1183 }
1184 PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1185 ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1186 }
1187 }
1188}
1189
1190const fn existing_rows_terminal_boundary_request_from_prepared(
1191 request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1192) -> ScalarTerminalBoundaryRequest {
1193 match request {
1194 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1195 ScalarTerminalBoundaryRequest::Count
1196 }
1197 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1198 ScalarTerminalBoundaryRequest::Exists
1199 }
1200 }
1201}
1202
1203const fn numeric_field_boundary_request_from_prepared(
1204 request: PreparedFluentNumericFieldRuntimeRequest,
1205) -> ScalarNumericFieldBoundaryRequest {
1206 match request {
1207 PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1208 PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1209 ScalarNumericFieldBoundaryRequest::SumDistinct
1210 }
1211 PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1212 PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1213 ScalarNumericFieldBoundaryRequest::AvgDistinct
1214 }
1215 }
1216}
1217
1218fn order_sensitive_terminal_boundary_request_from_prepared(
1219 request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1220) -> ScalarTerminalBoundaryRequest {
1221 match request {
1222 PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1223 ScalarTerminalBoundaryRequest::IdTerminal { kind }
1224 }
1225 PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1226 ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1227 }
1228 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1229 ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1230 }
1231 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1232 ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1233 }
1234 }
1235}
1236
1237const fn projection_boundary_request_from_prepared(
1238 request: PreparedFluentProjectionRuntimeRequest,
1239) -> ScalarProjectionBoundaryRequest {
1240 match request {
1241 PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1242 PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1243 ScalarProjectionBoundaryRequest::DistinctValues
1244 }
1245 PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1246 ScalarProjectionBoundaryRequest::CountDistinct
1247 }
1248 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1249 ScalarProjectionBoundaryRequest::ValuesWithIds
1250 }
1251 PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1252 ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1253 }
1254 }
1255}