1use crate::{
7 db::{
8 PersistedRow,
9 executor::{
10 LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12 ScalarTerminalBoundaryRequest,
13 },
14 query::{
15 api::ResponseCardinalityExt,
16 builder::{
17 PreparedFluentAggregateExplainStrategy,
18 PreparedFluentExistingRowsTerminalRuntimeRequest,
19 PreparedFluentExistingRowsTerminalStrategy,
20 PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21 PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22 PreparedFluentOrderSensitiveTerminalStrategy,
23 PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24 PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25 TextProjectionExpr,
26 },
27 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28 fluent::load::{FluentLoadQuery, LoadQueryResult},
29 intent::QueryError,
30 plan::AggregateKind,
31 },
32 response::EntityResponse,
33 },
34 error::InternalError,
35 traits::EntityValue,
36 types::{Decimal, Id},
37 value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44 E: PersistedRow,
45{
46 pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52 where
53 E: EntityValue,
54 {
55 self.ensure_non_paged_mode_ready()?;
56
57 if self.query().has_grouping() {
58 return self
59 .session
60 .execute_grouped(self.query(), self.cursor_token.as_deref())
61 .map(LoadQueryResult::Grouped);
62 }
63
64 self.session
65 .execute_query(self.query())
66 .map(LoadQueryResult::Rows)
67 }
68
69 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72 where
73 E: EntityValue,
74 F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75 {
76 self.ensure_non_paged_mode_ready()?;
77
78 self.session.execute_load_query_with(self.query(), execute)
79 }
80
81 fn explain_prepared_aggregate_non_paged_terminal<S>(
85 &self,
86 strategy: &S,
87 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88 where
89 E: EntityValue,
90 S: PreparedFluentAggregateExplainStrategy,
91 {
92 self.ensure_non_paged_mode_ready()?;
93
94 self.session
95 .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96 }
97
98 fn explain_prepared_projection_non_paged_terminal(
102 &self,
103 strategy: &PreparedFluentProjectionStrategy,
104 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105 where
106 E: EntityValue,
107 {
108 self.ensure_non_paged_mode_ready()?;
109
110 self.session
111 .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112 }
113
114 fn execute_prepared_scalar_terminal_output(
118 &self,
119 strategy: PreparedFluentScalarTerminalStrategy,
120 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
121 where
122 E: EntityValue,
123 {
124 let runtime_request = strategy.into_runtime_request();
125
126 self.execute_scalar_non_paged_terminal(move |load, plan| {
127 load.execute_scalar_terminal_request(
128 plan,
129 scalar_terminal_boundary_request_from_prepared(runtime_request),
130 )
131 })
132 }
133
134 fn execute_prepared_existing_rows_terminal_output(
138 &self,
139 strategy: PreparedFluentExistingRowsTerminalStrategy,
140 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
141 where
142 E: EntityValue,
143 {
144 let runtime_request = strategy.into_runtime_request();
145
146 self.execute_scalar_non_paged_terminal(move |load, plan| {
147 load.execute_scalar_terminal_request(
148 plan,
149 existing_rows_terminal_boundary_request_from_prepared(runtime_request),
150 )
151 })
152 }
153
154 fn execute_prepared_numeric_field_terminal(
158 &self,
159 strategy: PreparedFluentNumericFieldStrategy,
160 ) -> Result<Option<Decimal>, QueryError>
161 where
162 E: EntityValue,
163 {
164 let (target_field, runtime_request) = strategy.into_runtime_parts();
165
166 self.execute_scalar_non_paged_terminal(move |load, plan| {
167 load.execute_numeric_field_boundary(
168 plan,
169 target_field,
170 numeric_field_boundary_request_from_prepared(runtime_request),
171 )
172 })
173 }
174
175 fn execute_prepared_order_sensitive_terminal_output(
179 &self,
180 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
181 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
182 where
183 E: EntityValue,
184 {
185 let runtime_request = strategy.into_runtime_request();
186
187 self.execute_scalar_non_paged_terminal(move |load, plan| {
188 load.execute_scalar_terminal_request(
189 plan,
190 order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
191 )
192 })
193 }
194
195 fn execute_prepared_projection_terminal_output(
199 &self,
200 strategy: PreparedFluentProjectionStrategy,
201 ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
202 where
203 E: EntityValue,
204 {
205 let (target_field, runtime_request) = strategy.into_runtime_parts();
206
207 self.execute_scalar_non_paged_terminal(move |load, plan| {
208 load.execute_scalar_projection_boundary(
209 plan,
210 target_field,
211 projection_boundary_request_from_prepared(runtime_request),
212 )
213 })
214 }
215
216 fn project_terminal_items<T, U>(
219 projection: &TextProjectionExpr,
220 values: impl IntoIterator<Item = T>,
221 mut map: impl FnMut(&TextProjectionExpr, T) -> Result<U, QueryError>,
222 ) -> Result<Vec<U>, QueryError> {
223 values
224 .into_iter()
225 .map(|value| map(projection, value))
226 .collect()
227 }
228
229 pub fn is_empty(&self) -> Result<bool, QueryError>
235 where
236 E: EntityValue,
237 {
238 self.not_exists()
239 }
240
241 pub fn not_exists(&self) -> Result<bool, QueryError>
243 where
244 E: EntityValue,
245 {
246 Ok(!self.exists()?)
247 }
248
249 pub fn exists(&self) -> Result<bool, QueryError>
251 where
252 E: EntityValue,
253 {
254 self.execute_prepared_existing_rows_terminal_output(
255 PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
256 )?
257 .into_exists()
258 .map_err(QueryError::execute)
259 }
260
261 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
263 where
264 E: EntityValue,
265 {
266 self.explain_prepared_aggregate_non_paged_terminal(
267 &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
268 )
269 }
270
271 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
275 where
276 E: EntityValue,
277 {
278 self.explain_exists()
279 }
280
281 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
283 where
284 E: EntityValue,
285 {
286 self.session
287 .explain_query_execution_with_visible_indexes(self.query())
288 }
289
290 pub fn explain_execution_text(&self) -> Result<String, QueryError>
292 where
293 E: EntityValue,
294 {
295 self.session
296 .explain_query_execution_text_with_visible_indexes(self.query())
297 }
298
299 pub fn explain_execution_json(&self) -> Result<String, QueryError>
301 where
302 E: EntityValue,
303 {
304 self.session
305 .explain_query_execution_json_with_visible_indexes(self.query())
306 }
307
308 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
310 where
311 E: EntityValue,
312 {
313 self.session
314 .explain_query_execution_verbose_with_visible_indexes(self.query())
315 }
316
317 pub fn count(&self) -> Result<u32, QueryError>
319 where
320 E: EntityValue,
321 {
322 self.execute_prepared_existing_rows_terminal_output(
323 PreparedFluentExistingRowsTerminalStrategy::count_rows(),
324 )?
325 .into_count()
326 .map_err(QueryError::execute)
327 }
328
329 pub fn bytes(&self) -> Result<u64, QueryError>
332 where
333 E: EntityValue,
334 {
335 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
336 }
337
338 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
341 where
342 E: EntityValue,
343 {
344 self.ensure_non_paged_mode_ready()?;
345
346 Self::with_slot(field, |target_slot| {
347 self.session
348 .execute_load_query_with(self.query(), move |load, plan| {
349 load.bytes_by_slot(plan, target_slot)
350 })
351 })
352 }
353
354 pub fn explain_bytes_by(
356 &self,
357 field: impl AsRef<str>,
358 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
359 where
360 E: EntityValue,
361 {
362 self.ensure_non_paged_mode_ready()?;
363
364 Self::with_slot(field, |target_slot| {
365 self.session
366 .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
367 })
368 }
369
370 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
372 where
373 E: EntityValue,
374 {
375 self.execute_prepared_scalar_terminal_output(
376 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
377 )?
378 .into_id()
379 .map_err(QueryError::execute)
380 }
381
382 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
384 where
385 E: EntityValue,
386 {
387 self.explain_prepared_aggregate_non_paged_terminal(
388 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
389 )
390 }
391
392 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
396 where
397 E: EntityValue,
398 {
399 self.ensure_non_paged_mode_ready()?;
400
401 Self::with_slot(field, |target_slot| {
402 self.execute_prepared_scalar_terminal_output(
403 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
404 )?
405 .into_id()
406 .map_err(QueryError::execute)
407 })
408 }
409
410 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
412 where
413 E: EntityValue,
414 {
415 self.execute_prepared_scalar_terminal_output(
416 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
417 )?
418 .into_id()
419 .map_err(QueryError::execute)
420 }
421
422 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
424 where
425 E: EntityValue,
426 {
427 self.explain_prepared_aggregate_non_paged_terminal(
428 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
429 )
430 }
431
432 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
436 where
437 E: EntityValue,
438 {
439 self.ensure_non_paged_mode_ready()?;
440
441 Self::with_slot(field, |target_slot| {
442 self.execute_prepared_scalar_terminal_output(
443 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
444 )?
445 .into_id()
446 .map_err(QueryError::execute)
447 })
448 }
449
450 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
453 where
454 E: EntityValue,
455 {
456 self.ensure_non_paged_mode_ready()?;
457
458 Self::with_slot(field, |target_slot| {
459 self.execute_prepared_order_sensitive_terminal_output(
460 PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
461 )?
462 .into_id()
463 .map_err(QueryError::execute)
464 })
465 }
466
467 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
469 where
470 E: EntityValue,
471 {
472 self.ensure_non_paged_mode_ready()?;
473
474 Self::with_slot(field, |target_slot| {
475 self.execute_prepared_numeric_field_terminal(
476 PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
477 )
478 })
479 }
480
481 pub fn explain_sum_by(
483 &self,
484 field: impl AsRef<str>,
485 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
486 where
487 E: EntityValue,
488 {
489 self.ensure_non_paged_mode_ready()?;
490
491 Self::with_slot(field, |target_slot| {
492 self.explain_prepared_aggregate_non_paged_terminal(
493 &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
494 )
495 })
496 }
497
498 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
500 where
501 E: EntityValue,
502 {
503 self.ensure_non_paged_mode_ready()?;
504
505 Self::with_slot(field, |target_slot| {
506 self.execute_prepared_numeric_field_terminal(
507 PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
508 )
509 })
510 }
511
512 pub fn explain_sum_distinct_by(
514 &self,
515 field: impl AsRef<str>,
516 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
517 where
518 E: EntityValue,
519 {
520 self.ensure_non_paged_mode_ready()?;
521
522 Self::with_slot(field, |target_slot| {
523 self.explain_prepared_aggregate_non_paged_terminal(
524 &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
525 )
526 })
527 }
528
529 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
531 where
532 E: EntityValue,
533 {
534 self.ensure_non_paged_mode_ready()?;
535
536 Self::with_slot(field, |target_slot| {
537 self.execute_prepared_numeric_field_terminal(
538 PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
539 )
540 })
541 }
542
543 pub fn explain_avg_by(
545 &self,
546 field: impl AsRef<str>,
547 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
548 where
549 E: EntityValue,
550 {
551 self.ensure_non_paged_mode_ready()?;
552
553 Self::with_slot(field, |target_slot| {
554 self.explain_prepared_aggregate_non_paged_terminal(
555 &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
556 )
557 })
558 }
559
560 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
562 where
563 E: EntityValue,
564 {
565 self.ensure_non_paged_mode_ready()?;
566
567 Self::with_slot(field, |target_slot| {
568 self.execute_prepared_numeric_field_terminal(
569 PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
570 )
571 })
572 }
573
574 pub fn explain_avg_distinct_by(
576 &self,
577 field: impl AsRef<str>,
578 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
579 where
580 E: EntityValue,
581 {
582 self.ensure_non_paged_mode_ready()?;
583
584 Self::with_slot(field, |target_slot| {
585 self.explain_prepared_aggregate_non_paged_terminal(
586 &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
587 )
588 })
589 }
590
591 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
596 where
597 E: EntityValue,
598 {
599 self.ensure_non_paged_mode_ready()?;
600
601 Self::with_slot(field, |target_slot| {
602 self.execute_prepared_order_sensitive_terminal_output(
603 PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
604 )?
605 .into_id()
606 .map_err(QueryError::execute)
607 })
608 }
609
610 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
613 where
614 E: EntityValue,
615 {
616 self.ensure_non_paged_mode_ready()?;
617
618 Self::with_slot(field, |target_slot| {
619 self.execute_prepared_projection_terminal_output(
620 PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
621 )?
622 .into_count()
623 .map_err(QueryError::execute)
624 })
625 }
626
627 pub fn explain_count_distinct_by(
629 &self,
630 field: impl AsRef<str>,
631 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
632 where
633 E: EntityValue,
634 {
635 self.ensure_non_paged_mode_ready()?;
636
637 Self::with_slot(field, |target_slot| {
638 self.explain_prepared_projection_non_paged_terminal(
639 &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
640 )
641 })
642 }
643
644 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
648 where
649 E: EntityValue,
650 {
651 self.ensure_non_paged_mode_ready()?;
652
653 Self::with_slot(field, |target_slot| {
654 self.execute_prepared_order_sensitive_terminal_output(
655 PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
656 )?
657 .into_id_pair()
658 .map_err(QueryError::execute)
659 })
660 }
661
662 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
664 where
665 E: EntityValue,
666 {
667 self.ensure_non_paged_mode_ready()?;
668
669 Self::with_slot(field, |target_slot| {
670 self.execute_prepared_projection_terminal_output(
671 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
672 )?
673 .into_values()
674 .map_err(QueryError::execute)
675 })
676 }
677
678 pub fn project_values(&self, projection: &TextProjectionExpr) -> Result<Vec<Value>, QueryError>
681 where
682 E: EntityValue,
683 {
684 self.ensure_non_paged_mode_ready()?;
685
686 Self::with_slot(projection.field(), |target_slot| {
687 let values = self
688 .execute_prepared_projection_terminal_output(
689 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
690 )?
691 .into_values()
692 .map_err(QueryError::execute)?;
693
694 Self::project_terminal_items(projection, values, |projection, value| {
695 projection.apply_value(value)
696 })
697 })
698 }
699
700 pub fn explain_project_values(
702 &self,
703 projection: &TextProjectionExpr,
704 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
705 where
706 E: EntityValue,
707 {
708 self.ensure_non_paged_mode_ready()?;
709
710 Self::with_slot(projection.field(), |target_slot| {
711 self.explain_prepared_projection_non_paged_terminal(
712 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
713 )
714 })
715 }
716
717 pub fn explain_values_by(
719 &self,
720 field: impl AsRef<str>,
721 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
722 where
723 E: EntityValue,
724 {
725 self.ensure_non_paged_mode_ready()?;
726
727 Self::with_slot(field, |target_slot| {
728 self.explain_prepared_projection_non_paged_terminal(
729 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
730 )
731 })
732 }
733
734 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
736 where
737 E: EntityValue,
738 {
739 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
740 }
741
742 pub fn top_k_by(
750 &self,
751 field: impl AsRef<str>,
752 take_count: u32,
753 ) -> Result<EntityResponse<E>, QueryError>
754 where
755 E: EntityValue,
756 {
757 self.ensure_non_paged_mode_ready()?;
758
759 Self::with_slot(field, |target_slot| {
760 self.session
761 .execute_load_query_with(self.query(), move |load, plan| {
762 load.top_k_by_slot(plan, target_slot, take_count)
763 })
764 })
765 }
766
767 pub fn bottom_k_by(
775 &self,
776 field: impl AsRef<str>,
777 take_count: u32,
778 ) -> Result<EntityResponse<E>, QueryError>
779 where
780 E: EntityValue,
781 {
782 self.ensure_non_paged_mode_ready()?;
783
784 Self::with_slot(field, |target_slot| {
785 self.session
786 .execute_load_query_with(self.query(), move |load, plan| {
787 load.bottom_k_by_slot(plan, target_slot, take_count)
788 })
789 })
790 }
791
792 pub fn top_k_by_values(
800 &self,
801 field: impl AsRef<str>,
802 take_count: u32,
803 ) -> Result<Vec<Value>, QueryError>
804 where
805 E: EntityValue,
806 {
807 self.ensure_non_paged_mode_ready()?;
808
809 Self::with_slot(field, |target_slot| {
810 self.session
811 .execute_load_query_with(self.query(), move |load, plan| {
812 load.top_k_by_values_slot(plan, target_slot, take_count)
813 })
814 })
815 }
816
817 pub fn bottom_k_by_values(
825 &self,
826 field: impl AsRef<str>,
827 take_count: u32,
828 ) -> Result<Vec<Value>, QueryError>
829 where
830 E: EntityValue,
831 {
832 self.ensure_non_paged_mode_ready()?;
833
834 Self::with_slot(field, |target_slot| {
835 self.session
836 .execute_load_query_with(self.query(), move |load, plan| {
837 load.bottom_k_by_values_slot(plan, target_slot, take_count)
838 })
839 })
840 }
841
842 pub fn top_k_by_with_ids(
850 &self,
851 field: impl AsRef<str>,
852 take_count: u32,
853 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
854 where
855 E: EntityValue,
856 {
857 self.ensure_non_paged_mode_ready()?;
858
859 Self::with_slot(field, |target_slot| {
860 self.session
861 .execute_load_query_with(self.query(), move |load, plan| {
862 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
863 })
864 })
865 }
866
867 pub fn bottom_k_by_with_ids(
875 &self,
876 field: impl AsRef<str>,
877 take_count: u32,
878 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
879 where
880 E: EntityValue,
881 {
882 self.ensure_non_paged_mode_ready()?;
883
884 Self::with_slot(field, |target_slot| {
885 self.session
886 .execute_load_query_with(self.query(), move |load, plan| {
887 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
888 })
889 })
890 }
891
892 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
895 where
896 E: EntityValue,
897 {
898 self.ensure_non_paged_mode_ready()?;
899
900 Self::with_slot(field, |target_slot| {
901 self.execute_prepared_projection_terminal_output(
902 PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
903 )?
904 .into_values()
905 .map_err(QueryError::execute)
906 })
907 }
908
909 pub fn explain_distinct_values_by(
911 &self,
912 field: impl AsRef<str>,
913 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
914 where
915 E: EntityValue,
916 {
917 self.ensure_non_paged_mode_ready()?;
918
919 Self::with_slot(field, |target_slot| {
920 self.explain_prepared_projection_non_paged_terminal(
921 &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
922 )
923 })
924 }
925
926 pub fn values_by_with_ids(
929 &self,
930 field: impl AsRef<str>,
931 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
932 where
933 E: EntityValue,
934 {
935 self.ensure_non_paged_mode_ready()?;
936
937 Self::with_slot(field, |target_slot| {
938 self.execute_prepared_projection_terminal_output(
939 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
940 )?
941 .into_values_with_ids()
942 .map_err(QueryError::execute)
943 })
944 }
945
946 pub fn project_values_with_ids(
949 &self,
950 projection: &TextProjectionExpr,
951 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
952 where
953 E: EntityValue,
954 {
955 self.ensure_non_paged_mode_ready()?;
956
957 Self::with_slot(projection.field(), |target_slot| {
958 let values = self
959 .execute_prepared_projection_terminal_output(
960 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
961 )?
962 .into_values_with_ids::<E>()
963 .map_err(QueryError::execute)?;
964
965 Self::project_terminal_items(projection, values, |projection, (id, value)| {
966 Ok((id, projection.apply_value(value)?))
967 })
968 })
969 }
970
971 pub fn explain_values_by_with_ids(
973 &self,
974 field: impl AsRef<str>,
975 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
976 where
977 E: EntityValue,
978 {
979 self.ensure_non_paged_mode_ready()?;
980
981 Self::with_slot(field, |target_slot| {
982 self.explain_prepared_projection_non_paged_terminal(
983 &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
984 )
985 })
986 }
987
988 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
991 where
992 E: EntityValue,
993 {
994 self.ensure_non_paged_mode_ready()?;
995
996 Self::with_slot(field, |target_slot| {
997 self.execute_prepared_projection_terminal_output(
998 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
999 )?
1000 .into_terminal_value()
1001 .map_err(QueryError::execute)
1002 })
1003 }
1004
1005 pub fn project_first_value(
1008 &self,
1009 projection: &TextProjectionExpr,
1010 ) -> Result<Option<Value>, QueryError>
1011 where
1012 E: EntityValue,
1013 {
1014 self.ensure_non_paged_mode_ready()?;
1015
1016 Self::with_slot(projection.field(), |target_slot| {
1017 let value = self
1018 .execute_prepared_projection_terminal_output(
1019 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1020 )?
1021 .into_terminal_value()
1022 .map_err(QueryError::execute)?;
1023
1024 let mut projected =
1025 Self::project_terminal_items(projection, value, |projection, value| {
1026 projection.apply_value(value)
1027 })?;
1028
1029 Ok(projected.pop())
1030 })
1031 }
1032
1033 pub fn explain_first_value_by(
1035 &self,
1036 field: impl AsRef<str>,
1037 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1038 where
1039 E: EntityValue,
1040 {
1041 self.ensure_non_paged_mode_ready()?;
1042
1043 Self::with_slot(field, |target_slot| {
1044 self.explain_prepared_projection_non_paged_terminal(
1045 &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1046 )
1047 })
1048 }
1049
1050 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1053 where
1054 E: EntityValue,
1055 {
1056 self.ensure_non_paged_mode_ready()?;
1057
1058 Self::with_slot(field, |target_slot| {
1059 self.execute_prepared_projection_terminal_output(
1060 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1061 )?
1062 .into_terminal_value()
1063 .map_err(QueryError::execute)
1064 })
1065 }
1066
1067 pub fn project_last_value(
1070 &self,
1071 projection: &TextProjectionExpr,
1072 ) -> Result<Option<Value>, QueryError>
1073 where
1074 E: EntityValue,
1075 {
1076 self.ensure_non_paged_mode_ready()?;
1077
1078 Self::with_slot(projection.field(), |target_slot| {
1079 let value = self
1080 .execute_prepared_projection_terminal_output(
1081 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1082 )?
1083 .into_terminal_value()
1084 .map_err(QueryError::execute)?;
1085
1086 let mut projected =
1087 Self::project_terminal_items(projection, value, |projection, value| {
1088 projection.apply_value(value)
1089 })?;
1090
1091 Ok(projected.pop())
1092 })
1093 }
1094
1095 pub fn explain_last_value_by(
1097 &self,
1098 field: impl AsRef<str>,
1099 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1100 where
1101 E: EntityValue,
1102 {
1103 self.ensure_non_paged_mode_ready()?;
1104
1105 Self::with_slot(field, |target_slot| {
1106 self.explain_prepared_projection_non_paged_terminal(
1107 &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1108 )
1109 })
1110 }
1111
1112 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1114 where
1115 E: EntityValue,
1116 {
1117 self.execute_prepared_order_sensitive_terminal_output(
1118 PreparedFluentOrderSensitiveTerminalStrategy::first(),
1119 )?
1120 .into_id()
1121 .map_err(QueryError::execute)
1122 }
1123
1124 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1126 where
1127 E: EntityValue,
1128 {
1129 self.explain_prepared_aggregate_non_paged_terminal(
1130 &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1131 )
1132 }
1133
1134 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1136 where
1137 E: EntityValue,
1138 {
1139 self.execute_prepared_order_sensitive_terminal_output(
1140 PreparedFluentOrderSensitiveTerminalStrategy::last(),
1141 )?
1142 .into_id()
1143 .map_err(QueryError::execute)
1144 }
1145
1146 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1148 where
1149 E: EntityValue,
1150 {
1151 self.explain_prepared_aggregate_non_paged_terminal(
1152 &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1153 )
1154 }
1155
1156 pub fn require_one(&self) -> Result<(), QueryError>
1158 where
1159 E: EntityValue,
1160 {
1161 self.execute()?.into_rows()?.require_one()?;
1162 Ok(())
1163 }
1164
1165 pub fn require_some(&self) -> Result<(), QueryError>
1167 where
1168 E: EntityValue,
1169 {
1170 self.execute()?.into_rows()?.require_some()?;
1171 Ok(())
1172 }
1173}
1174
1175fn scalar_terminal_boundary_request_from_prepared(
1176 request: PreparedFluentScalarTerminalRuntimeRequest,
1177) -> ScalarTerminalBoundaryRequest {
1178 match request {
1179 PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1180 ScalarTerminalBoundaryRequest::IdTerminal { kind }
1181 }
1182 PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1183 ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1184 }
1185 }
1186}
1187
1188const fn existing_rows_terminal_boundary_request_from_prepared(
1189 request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1190) -> ScalarTerminalBoundaryRequest {
1191 match request {
1192 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1193 ScalarTerminalBoundaryRequest::Count
1194 }
1195 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1196 ScalarTerminalBoundaryRequest::Exists
1197 }
1198 }
1199}
1200
1201const fn numeric_field_boundary_request_from_prepared(
1202 request: PreparedFluentNumericFieldRuntimeRequest,
1203) -> ScalarNumericFieldBoundaryRequest {
1204 match request {
1205 PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1206 PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1207 ScalarNumericFieldBoundaryRequest::SumDistinct
1208 }
1209 PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1210 PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1211 ScalarNumericFieldBoundaryRequest::AvgDistinct
1212 }
1213 }
1214}
1215
1216fn order_sensitive_terminal_boundary_request_from_prepared(
1217 request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1218) -> ScalarTerminalBoundaryRequest {
1219 match request {
1220 PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1221 ScalarTerminalBoundaryRequest::IdTerminal { kind }
1222 }
1223 PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1224 ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1225 }
1226 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1227 ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1228 }
1229 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1230 ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1231 }
1232 }
1233}
1234
1235const fn projection_boundary_request_from_prepared(
1236 request: PreparedFluentProjectionRuntimeRequest,
1237) -> ScalarProjectionBoundaryRequest {
1238 match request {
1239 PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1240 PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1241 ScalarProjectionBoundaryRequest::DistinctValues
1242 }
1243 PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1244 ScalarProjectionBoundaryRequest::CountDistinct
1245 }
1246 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1247 ScalarProjectionBoundaryRequest::ValuesWithIds
1248 }
1249 PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1250 ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1251 }
1252 }
1253}