1use crate::{
7 db::{
8 DbSession, PersistedRow, Query,
9 executor::{
10 LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12 ScalarTerminalBoundaryRequest,
13 },
14 query::{
15 api::ResponseCardinalityExt,
16 builder::{
17 PreparedFluentAggregateExplainStrategy,
18 PreparedFluentExistingRowsTerminalRuntimeRequest,
19 PreparedFluentExistingRowsTerminalStrategy,
20 PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21 PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22 PreparedFluentOrderSensitiveTerminalStrategy,
23 PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24 PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25 ValueProjectionExpr,
26 },
27 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28 fluent::load::{FluentLoadQuery, LoadQueryResult},
29 intent::QueryError,
30 plan::AggregateKind,
31 },
32 response::EntityResponse,
33 },
34 error::InternalError,
35 traits::EntityValue,
36 types::{Decimal, Id},
37 value::{OutputValue, Value},
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42fn output(value: Value) -> OutputValue {
44 OutputValue::from(value)
45}
46
47fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
49 values.into_iter().map(output).collect()
50}
51
52fn output_values_with_ids<E: PersistedRow>(
54 values: Vec<(Id<E>, Value)>,
55) -> Vec<(Id<E>, OutputValue)> {
56 values
57 .into_iter()
58 .map(|(id, value)| (id, output(value)))
59 .collect()
60}
61
62impl<E> FluentLoadQuery<'_, E>
63where
64 E: PersistedRow,
65{
66 pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
72 where
73 E: EntityValue,
74 {
75 self.ensure_non_paged_mode_ready()?;
76 self.session.execute_query_result(self.query())
77 }
78
79 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
82 where
83 E: EntityValue,
84 F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
85 {
86 self.ensure_non_paged_mode_ready()?;
87
88 self.session.execute_load_query_with(self.query(), execute)
89 }
90
91 fn map_non_paged_query_output<T>(
95 &self,
96 map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
97 ) -> Result<T, QueryError>
98 where
99 E: EntityValue,
100 {
101 self.ensure_non_paged_mode_ready()?;
102 map(self.session, self.query())
103 }
104
105 fn explain_prepared_aggregate_non_paged_terminal<S>(
109 &self,
110 strategy: &S,
111 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
112 where
113 E: EntityValue,
114 S: PreparedFluentAggregateExplainStrategy,
115 {
116 self.map_non_paged_query_output(|session, query| {
117 session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
118 })
119 }
120
121 fn explain_prepared_projection_non_paged_terminal(
125 &self,
126 strategy: &PreparedFluentProjectionStrategy,
127 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
128 where
129 E: EntityValue,
130 {
131 self.map_non_paged_query_output(|session, query| {
132 session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
133 })
134 }
135
136 fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
139 where
140 E: EntityValue,
141 {
142 self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
143 }
144
145 fn render_execution_descriptor(
148 &self,
149 render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
150 ) -> Result<String, QueryError>
151 where
152 E: EntityValue,
153 {
154 let descriptor = self.explain_execution_descriptor()?;
155
156 Ok(render(descriptor))
157 }
158
159 fn map_prepared_existing_rows_terminal_output<T>(
162 &self,
163 strategy: PreparedFluentExistingRowsTerminalStrategy,
164 map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
165 ) -> Result<T, QueryError>
166 where
167 E: EntityValue,
168 {
169 let output =
170 self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
171
172 map(output).map_err(QueryError::execute)
173 }
174
175 fn execute_prepared_numeric_field_terminal(
179 &self,
180 strategy: PreparedFluentNumericFieldStrategy,
181 ) -> Result<Option<Decimal>, QueryError>
182 where
183 E: EntityValue,
184 {
185 let (target_field, runtime_request) = strategy.into_runtime_parts();
186
187 self.execute_scalar_non_paged_terminal(move |load, plan| {
188 load.execute_numeric_field_boundary(plan, target_field, runtime_request.into())
189 })
190 }
191
192 fn map_prepared_order_sensitive_terminal_output<T>(
195 &self,
196 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
197 map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
198 ) -> Result<T, QueryError>
199 where
200 E: EntityValue,
201 {
202 let output =
203 self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
204
205 map(output).map_err(QueryError::execute)
206 }
207
208 fn execute_prepared_projection_terminal_output(
212 &self,
213 strategy: PreparedFluentProjectionStrategy,
214 ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
215 where
216 E: EntityValue,
217 {
218 let (target_field, runtime_request) = strategy.into_runtime_parts();
219
220 self.execute_scalar_non_paged_terminal(move |load, plan| {
221 load.execute_scalar_projection_boundary(plan, target_field, runtime_request.into())
222 })
223 }
224
225 fn map_prepared_projection_terminal_output<T>(
228 &self,
229 strategy: PreparedFluentProjectionStrategy,
230 map: impl FnOnce(
231 crate::db::executor::ScalarProjectionBoundaryOutput,
232 ) -> Result<T, InternalError>,
233 ) -> Result<T, QueryError>
234 where
235 E: EntityValue,
236 {
237 let output = self.execute_prepared_projection_terminal_output(strategy)?;
238
239 map(output).map_err(QueryError::execute)
240 }
241
242 fn execute_scalar_terminal_boundary_output<R>(
246 &self,
247 runtime_request: R,
248 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
249 where
250 E: EntityValue,
251 R: Into<ScalarTerminalBoundaryRequest>,
252 {
253 self.execute_scalar_non_paged_terminal(move |load, plan| {
254 load.execute_scalar_terminal_request(plan, runtime_request.into())
255 })
256 }
257
258 fn map_prepared_scalar_terminal_output<T>(
261 &self,
262 strategy: PreparedFluentScalarTerminalStrategy,
263 map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
264 ) -> Result<T, QueryError>
265 where
266 E: EntityValue,
267 {
268 let output =
269 self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
270
271 map(output).map_err(QueryError::execute)
272 }
273
274 fn project_terminal_items<P, T, U>(
277 projection: &P,
278 values: impl IntoIterator<Item = T>,
279 mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
280 ) -> Result<Vec<U>, QueryError>
281 where
282 P: ValueProjectionExpr,
283 {
284 values
285 .into_iter()
286 .map(|value| map(projection, value))
287 .collect()
288 }
289
290 pub fn is_empty(&self) -> Result<bool, QueryError>
296 where
297 E: EntityValue,
298 {
299 self.not_exists()
300 }
301
302 pub fn not_exists(&self) -> Result<bool, QueryError>
304 where
305 E: EntityValue,
306 {
307 Ok(!self.exists()?)
308 }
309
310 pub fn exists(&self) -> Result<bool, QueryError>
312 where
313 E: EntityValue,
314 {
315 self.map_prepared_existing_rows_terminal_output(
316 PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
317 ScalarTerminalBoundaryOutput::into_exists,
318 )
319 }
320
321 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
323 where
324 E: EntityValue,
325 {
326 self.explain_prepared_aggregate_non_paged_terminal(
327 &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
328 )
329 }
330
331 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
335 where
336 E: EntityValue,
337 {
338 self.explain_exists()
339 }
340
341 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
343 where
344 E: EntityValue,
345 {
346 self.explain_execution_descriptor()
347 }
348
349 pub fn explain_execution_text(&self) -> Result<String, QueryError>
351 where
352 E: EntityValue,
353 {
354 self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
355 }
356
357 pub fn explain_execution_json(&self) -> Result<String, QueryError>
359 where
360 E: EntityValue,
361 {
362 self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
363 }
364
365 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
367 where
368 E: EntityValue,
369 {
370 self.map_non_paged_query_output(
371 DbSession::explain_query_execution_verbose_with_visible_indexes,
372 )
373 }
374
375 pub fn count(&self) -> Result<u32, QueryError>
377 where
378 E: EntityValue,
379 {
380 self.map_prepared_existing_rows_terminal_output(
381 PreparedFluentExistingRowsTerminalStrategy::count_rows(),
382 ScalarTerminalBoundaryOutput::into_count,
383 )
384 }
385
386 pub fn bytes(&self) -> Result<u64, QueryError>
389 where
390 E: EntityValue,
391 {
392 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
393 }
394
395 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
398 where
399 E: EntityValue,
400 {
401 self.with_non_paged_slot(field, |target_slot| {
402 self.session
403 .execute_load_query_with(self.query(), move |load, plan| {
404 load.bytes_by_slot(plan, target_slot)
405 })
406 })
407 }
408
409 pub fn explain_bytes_by(
411 &self,
412 field: impl AsRef<str>,
413 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
414 where
415 E: EntityValue,
416 {
417 self.with_non_paged_slot(field, |target_slot| {
418 self.session
419 .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
420 })
421 }
422
423 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
425 where
426 E: EntityValue,
427 {
428 self.map_prepared_scalar_terminal_output(
429 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
430 ScalarTerminalBoundaryOutput::into_id,
431 )
432 }
433
434 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
436 where
437 E: EntityValue,
438 {
439 self.explain_prepared_aggregate_non_paged_terminal(
440 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
441 )
442 }
443
444 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
448 where
449 E: EntityValue,
450 {
451 self.with_non_paged_slot(field, |target_slot| {
452 self.map_prepared_scalar_terminal_output(
453 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
454 ScalarTerminalBoundaryOutput::into_id,
455 )
456 })
457 }
458
459 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
461 where
462 E: EntityValue,
463 {
464 self.map_prepared_scalar_terminal_output(
465 PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
466 ScalarTerminalBoundaryOutput::into_id,
467 )
468 }
469
470 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
472 where
473 E: EntityValue,
474 {
475 self.explain_prepared_aggregate_non_paged_terminal(
476 &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
477 )
478 }
479
480 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
484 where
485 E: EntityValue,
486 {
487 self.with_non_paged_slot(field, |target_slot| {
488 self.map_prepared_scalar_terminal_output(
489 PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
490 ScalarTerminalBoundaryOutput::into_id,
491 )
492 })
493 }
494
495 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
498 where
499 E: EntityValue,
500 {
501 self.with_non_paged_slot(field, |target_slot| {
502 self.map_prepared_order_sensitive_terminal_output(
503 PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
504 ScalarTerminalBoundaryOutput::into_id,
505 )
506 })
507 }
508
509 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
511 where
512 E: EntityValue,
513 {
514 self.with_non_paged_slot(field, |target_slot| {
515 self.execute_prepared_numeric_field_terminal(
516 PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
517 )
518 })
519 }
520
521 pub fn explain_sum_by(
523 &self,
524 field: impl AsRef<str>,
525 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526 where
527 E: EntityValue,
528 {
529 self.with_non_paged_slot(field, |target_slot| {
530 self.explain_prepared_aggregate_non_paged_terminal(
531 &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
532 )
533 })
534 }
535
536 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
538 where
539 E: EntityValue,
540 {
541 self.with_non_paged_slot(field, |target_slot| {
542 self.execute_prepared_numeric_field_terminal(
543 PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
544 )
545 })
546 }
547
548 pub fn explain_sum_distinct_by(
550 &self,
551 field: impl AsRef<str>,
552 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
553 where
554 E: EntityValue,
555 {
556 self.with_non_paged_slot(field, |target_slot| {
557 self.explain_prepared_aggregate_non_paged_terminal(
558 &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
559 )
560 })
561 }
562
563 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
565 where
566 E: EntityValue,
567 {
568 self.with_non_paged_slot(field, |target_slot| {
569 self.execute_prepared_numeric_field_terminal(
570 PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
571 )
572 })
573 }
574
575 pub fn explain_avg_by(
577 &self,
578 field: impl AsRef<str>,
579 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
580 where
581 E: EntityValue,
582 {
583 self.with_non_paged_slot(field, |target_slot| {
584 self.explain_prepared_aggregate_non_paged_terminal(
585 &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
586 )
587 })
588 }
589
590 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
592 where
593 E: EntityValue,
594 {
595 self.with_non_paged_slot(field, |target_slot| {
596 self.execute_prepared_numeric_field_terminal(
597 PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
598 )
599 })
600 }
601
602 pub fn explain_avg_distinct_by(
604 &self,
605 field: impl AsRef<str>,
606 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
607 where
608 E: EntityValue,
609 {
610 self.with_non_paged_slot(field, |target_slot| {
611 self.explain_prepared_aggregate_non_paged_terminal(
612 &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
613 )
614 })
615 }
616
617 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
622 where
623 E: EntityValue,
624 {
625 self.with_non_paged_slot(field, |target_slot| {
626 self.map_prepared_order_sensitive_terminal_output(
627 PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
628 ScalarTerminalBoundaryOutput::into_id,
629 )
630 })
631 }
632
633 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
636 where
637 E: EntityValue,
638 {
639 self.with_non_paged_slot(field, |target_slot| {
640 self.map_prepared_projection_terminal_output(
641 PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
642 crate::db::executor::ScalarProjectionBoundaryOutput::into_count,
643 )
644 })
645 }
646
647 pub fn explain_count_distinct_by(
649 &self,
650 field: impl AsRef<str>,
651 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
652 where
653 E: EntityValue,
654 {
655 self.with_non_paged_slot(field, |target_slot| {
656 self.explain_prepared_projection_non_paged_terminal(
657 &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
658 )
659 })
660 }
661
662 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
666 where
667 E: EntityValue,
668 {
669 self.with_non_paged_slot(field, |target_slot| {
670 self.map_prepared_order_sensitive_terminal_output(
671 PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
672 ScalarTerminalBoundaryOutput::into_id_pair,
673 )
674 })
675 }
676
677 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
679 where
680 E: EntityValue,
681 {
682 self.with_non_paged_slot(field, |target_slot| {
683 self.map_prepared_projection_terminal_output(
684 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
685 crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
686 )
687 .map(output_values)
688 })
689 }
690
691 pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
694 where
695 E: EntityValue,
696 P: ValueProjectionExpr,
697 {
698 self.with_non_paged_slot(projection.field(), |target_slot| {
699 let values = self
700 .execute_prepared_projection_terminal_output(
701 PreparedFluentProjectionStrategy::values_by_slot(target_slot),
702 )?
703 .into_values()
704 .map_err(QueryError::execute)?;
705
706 Self::project_terminal_items(projection, values, |projection, value| {
707 projection.apply_value(value)
708 })
709 .map(output_values)
710 })
711 }
712
713 pub fn explain_project_values<P>(
715 &self,
716 projection: &P,
717 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
718 where
719 E: EntityValue,
720 P: ValueProjectionExpr,
721 {
722 self.with_non_paged_slot(projection.field(), |target_slot| {
723 self.explain_prepared_projection_non_paged_terminal(
724 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
725 )
726 })
727 }
728
729 pub fn explain_values_by(
731 &self,
732 field: impl AsRef<str>,
733 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
734 where
735 E: EntityValue,
736 {
737 self.with_non_paged_slot(field, |target_slot| {
738 self.explain_prepared_projection_non_paged_terminal(
739 &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
740 )
741 })
742 }
743
744 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
746 where
747 E: EntityValue,
748 {
749 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
750 }
751
752 pub fn top_k_by(
760 &self,
761 field: impl AsRef<str>,
762 take_count: u32,
763 ) -> Result<EntityResponse<E>, QueryError>
764 where
765 E: EntityValue,
766 {
767 self.with_non_paged_slot(field, |target_slot| {
768 self.session
769 .execute_load_query_with(self.query(), move |load, plan| {
770 load.top_k_by_slot(plan, target_slot, take_count)
771 })
772 })
773 }
774
775 pub fn bottom_k_by(
783 &self,
784 field: impl AsRef<str>,
785 take_count: u32,
786 ) -> Result<EntityResponse<E>, QueryError>
787 where
788 E: EntityValue,
789 {
790 self.with_non_paged_slot(field, |target_slot| {
791 self.session
792 .execute_load_query_with(self.query(), move |load, plan| {
793 load.bottom_k_by_slot(plan, target_slot, take_count)
794 })
795 })
796 }
797
798 pub fn top_k_by_values(
806 &self,
807 field: impl AsRef<str>,
808 take_count: u32,
809 ) -> Result<Vec<OutputValue>, QueryError>
810 where
811 E: EntityValue,
812 {
813 self.with_non_paged_slot(field, |target_slot| {
814 self.session
815 .execute_load_query_with(self.query(), move |load, plan| {
816 load.top_k_by_values_slot(plan, target_slot, take_count)
817 })
818 .map(output_values)
819 })
820 }
821
822 pub fn bottom_k_by_values(
830 &self,
831 field: impl AsRef<str>,
832 take_count: u32,
833 ) -> Result<Vec<OutputValue>, QueryError>
834 where
835 E: EntityValue,
836 {
837 self.with_non_paged_slot(field, |target_slot| {
838 self.session
839 .execute_load_query_with(self.query(), move |load, plan| {
840 load.bottom_k_by_values_slot(plan, target_slot, take_count)
841 })
842 .map(output_values)
843 })
844 }
845
846 pub fn top_k_by_with_ids(
854 &self,
855 field: impl AsRef<str>,
856 take_count: u32,
857 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
858 where
859 E: EntityValue,
860 {
861 self.with_non_paged_slot(field, |target_slot| {
862 self.session
863 .execute_load_query_with(self.query(), move |load, plan| {
864 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
865 })
866 .map(output_values_with_ids)
867 })
868 }
869
870 pub fn bottom_k_by_with_ids(
878 &self,
879 field: impl AsRef<str>,
880 take_count: u32,
881 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
882 where
883 E: EntityValue,
884 {
885 self.with_non_paged_slot(field, |target_slot| {
886 self.session
887 .execute_load_query_with(self.query(), move |load, plan| {
888 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
889 })
890 .map(output_values_with_ids)
891 })
892 }
893
894 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
897 where
898 E: EntityValue,
899 {
900 self.with_non_paged_slot(field, |target_slot| {
901 self.map_prepared_projection_terminal_output(
902 PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
903 crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
904 )
905 .map(output_values)
906 })
907 }
908
909 pub fn explain_distinct_values_by(
911 &self,
912 field: impl AsRef<str>,
913 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
914 where
915 E: EntityValue,
916 {
917 self.with_non_paged_slot(field, |target_slot| {
918 self.explain_prepared_projection_non_paged_terminal(
919 &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
920 )
921 })
922 }
923
924 pub fn values_by_with_ids(
927 &self,
928 field: impl AsRef<str>,
929 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
930 where
931 E: EntityValue,
932 {
933 self.with_non_paged_slot(field, |target_slot| {
934 self.map_prepared_projection_terminal_output(
935 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
936 crate::db::executor::ScalarProjectionBoundaryOutput::into_values_with_ids,
937 )
938 .map(output_values_with_ids)
939 })
940 }
941
942 pub fn project_values_with_ids<P>(
945 &self,
946 projection: &P,
947 ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
948 where
949 E: EntityValue,
950 P: ValueProjectionExpr,
951 {
952 self.with_non_paged_slot(projection.field(), |target_slot| {
953 let values = self
954 .execute_prepared_projection_terminal_output(
955 PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
956 )?
957 .into_values_with_ids::<E>()
958 .map_err(QueryError::execute)?;
959
960 Self::project_terminal_items(projection, values, |projection, (id, value)| {
961 Ok((id, projection.apply_value(value)?))
962 })
963 .map(output_values_with_ids)
964 })
965 }
966
967 pub fn explain_values_by_with_ids(
969 &self,
970 field: impl AsRef<str>,
971 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
972 where
973 E: EntityValue,
974 {
975 self.with_non_paged_slot(field, |target_slot| {
976 self.explain_prepared_projection_non_paged_terminal(
977 &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
978 )
979 })
980 }
981
982 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
985 where
986 E: EntityValue,
987 {
988 self.with_non_paged_slot(field, |target_slot| {
989 self.map_prepared_projection_terminal_output(
990 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
991 crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
992 )
993 .map(|value| value.map(output))
994 })
995 }
996
997 pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1000 where
1001 E: EntityValue,
1002 P: ValueProjectionExpr,
1003 {
1004 self.with_non_paged_slot(projection.field(), |target_slot| {
1005 let value = self
1006 .execute_prepared_projection_terminal_output(
1007 PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1008 )?
1009 .into_terminal_value()
1010 .map_err(QueryError::execute)?;
1011
1012 let mut projected =
1013 Self::project_terminal_items(projection, value, |projection, value| {
1014 projection.apply_value(value)
1015 })?;
1016
1017 Ok(projected.pop().map(output))
1018 })
1019 }
1020
1021 pub fn explain_first_value_by(
1023 &self,
1024 field: impl AsRef<str>,
1025 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1026 where
1027 E: EntityValue,
1028 {
1029 self.with_non_paged_slot(field, |target_slot| {
1030 self.explain_prepared_projection_non_paged_terminal(
1031 &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1032 )
1033 })
1034 }
1035
1036 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1039 where
1040 E: EntityValue,
1041 {
1042 self.with_non_paged_slot(field, |target_slot| {
1043 self.map_prepared_projection_terminal_output(
1044 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1045 crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
1046 )
1047 .map(|value| value.map(output))
1048 })
1049 }
1050
1051 pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1054 where
1055 E: EntityValue,
1056 P: ValueProjectionExpr,
1057 {
1058 self.with_non_paged_slot(projection.field(), |target_slot| {
1059 let value = self
1060 .execute_prepared_projection_terminal_output(
1061 PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1062 )?
1063 .into_terminal_value()
1064 .map_err(QueryError::execute)?;
1065
1066 let mut projected =
1067 Self::project_terminal_items(projection, value, |projection, value| {
1068 projection.apply_value(value)
1069 })?;
1070
1071 Ok(projected.pop().map(output))
1072 })
1073 }
1074
1075 pub fn explain_last_value_by(
1077 &self,
1078 field: impl AsRef<str>,
1079 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1080 where
1081 E: EntityValue,
1082 {
1083 self.with_non_paged_slot(field, |target_slot| {
1084 self.explain_prepared_projection_non_paged_terminal(
1085 &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1086 )
1087 })
1088 }
1089
1090 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1092 where
1093 E: EntityValue,
1094 {
1095 self.map_prepared_order_sensitive_terminal_output(
1096 PreparedFluentOrderSensitiveTerminalStrategy::first(),
1097 ScalarTerminalBoundaryOutput::into_id,
1098 )
1099 }
1100
1101 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1103 where
1104 E: EntityValue,
1105 {
1106 self.explain_prepared_aggregate_non_paged_terminal(
1107 &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1108 )
1109 }
1110
1111 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1113 where
1114 E: EntityValue,
1115 {
1116 self.map_prepared_order_sensitive_terminal_output(
1117 PreparedFluentOrderSensitiveTerminalStrategy::last(),
1118 ScalarTerminalBoundaryOutput::into_id,
1119 )
1120 }
1121
1122 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1124 where
1125 E: EntityValue,
1126 {
1127 self.explain_prepared_aggregate_non_paged_terminal(
1128 &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1129 )
1130 }
1131
1132 pub fn require_one(&self) -> Result<(), QueryError>
1134 where
1135 E: EntityValue,
1136 {
1137 self.execute()?.into_rows()?.require_one()?;
1138 Ok(())
1139 }
1140
1141 pub fn require_some(&self) -> Result<(), QueryError>
1143 where
1144 E: EntityValue,
1145 {
1146 self.execute()?.into_rows()?.require_some()?;
1147 Ok(())
1148 }
1149}
1150
1151impl From<PreparedFluentScalarTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1152 fn from(value: PreparedFluentScalarTerminalRuntimeRequest) -> Self {
1153 match value {
1154 PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1155 Self::IdTerminal { kind }
1156 }
1157 PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1158 Self::IdBySlot { kind, target_field }
1159 }
1160 }
1161 }
1162}
1163
1164impl From<PreparedFluentExistingRowsTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1165 fn from(value: PreparedFluentExistingRowsTerminalRuntimeRequest) -> Self {
1166 match value {
1167 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => Self::Count,
1168 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => Self::Exists,
1169 }
1170 }
1171}
1172
1173impl From<PreparedFluentNumericFieldRuntimeRequest> for ScalarNumericFieldBoundaryRequest {
1174 fn from(value: PreparedFluentNumericFieldRuntimeRequest) -> Self {
1175 match value {
1176 PreparedFluentNumericFieldRuntimeRequest::Sum => Self::Sum,
1177 PreparedFluentNumericFieldRuntimeRequest::SumDistinct => Self::SumDistinct,
1178 PreparedFluentNumericFieldRuntimeRequest::Avg => Self::Avg,
1179 PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => Self::AvgDistinct,
1180 }
1181 }
1182}
1183
1184impl From<PreparedFluentOrderSensitiveTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1185 fn from(value: PreparedFluentOrderSensitiveTerminalRuntimeRequest) -> Self {
1186 match value {
1187 PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1188 Self::IdTerminal { kind }
1189 }
1190 PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1191 Self::NthBySlot { target_field, nth }
1192 }
1193 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1194 Self::MedianBySlot { target_field }
1195 }
1196 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1197 Self::MinMaxBySlot { target_field }
1198 }
1199 }
1200 }
1201}
1202
1203impl From<PreparedFluentProjectionRuntimeRequest> for ScalarProjectionBoundaryRequest {
1204 fn from(value: PreparedFluentProjectionRuntimeRequest) -> Self {
1205 match value {
1206 PreparedFluentProjectionRuntimeRequest::Values => Self::Values,
1207 PreparedFluentProjectionRuntimeRequest::DistinctValues => Self::DistinctValues,
1208 PreparedFluentProjectionRuntimeRequest::CountDistinct => Self::CountDistinct,
1209 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => Self::ValuesWithIds,
1210 PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1211 Self::TerminalValue { terminal_kind }
1212 }
1213 }
1214 }
1215}