Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            ExecutablePlan, LoadExecutor, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25            },
26            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
27            fluent::load::FluentLoadQuery,
28            intent::QueryError,
29            plan::AggregateKind,
30        },
31        response::EntityResponse,
32    },
33    error::InternalError,
34    traits::EntityValue,
35    types::{Decimal, Id},
36    value::Value,
37};
38
39type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
40
41impl<E> FluentLoadQuery<'_, E>
42where
43    E: PersistedRow,
44{
45    // ------------------------------------------------------------------
46    // Execution (single semantic boundary)
47    // ------------------------------------------------------------------
48
49    /// Execute this query using the session's policy settings.
50    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
51    where
52        E: EntityValue,
53    {
54        self.ensure_non_paged_mode_ready()?;
55
56        self.session.execute_query(self.query())
57    }
58
59    // Run one scalar terminal through the canonical non-paged fluent policy
60    // gate before handing execution to the session load-query adapter.
61    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
62    where
63        E: EntityValue,
64        F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
65    {
66        self.ensure_non_paged_mode_ready()?;
67
68        self.session.execute_load_query_with(self.query(), execute)
69    }
70
71    // Run one explain-visible aggregate terminal through the canonical
72    // non-paged fluent policy gate using the prepared aggregate strategy as
73    // the single explain projection source.
74    fn explain_prepared_aggregate_non_paged_terminal<S>(
75        &self,
76        strategy: &S,
77    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
78    where
79        E: EntityValue,
80        S: PreparedFluentAggregateExplainStrategy,
81    {
82        self.ensure_non_paged_mode_ready()?;
83
84        self.session
85            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
86    }
87
88    // Run one prepared projection/distinct explain terminal through the
89    // canonical non-paged fluent policy gate using the prepared projection
90    // strategy as the single explain projection source.
91    fn explain_prepared_projection_non_paged_terminal(
92        &self,
93        strategy: &PreparedFluentProjectionStrategy,
94    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
95    where
96        E: EntityValue,
97    {
98        self.ensure_non_paged_mode_ready()?;
99
100        self.session
101            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
102    }
103
104    // Execute one prepared fluent scalar terminal through the canonical
105    // non-paged fluent policy gate using the prepared runtime request as the
106    // single execution source.
107    fn execute_prepared_scalar_terminal_output(
108        &self,
109        strategy: PreparedFluentScalarTerminalStrategy,
110    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
111    where
112        E: EntityValue,
113    {
114        self.execute_scalar_non_paged_terminal(move |load, plan| {
115            load.execute_scalar_terminal_request(
116                plan,
117                scalar_terminal_boundary_request_from_prepared(strategy.runtime_request().clone()),
118            )
119        })
120    }
121
122    // Execute one prepared fluent existing-rows terminal through the
123    // canonical non-paged fluent policy gate using the prepared existing-rows
124    // strategy as the single runtime source.
125    fn execute_prepared_existing_rows_terminal_output(
126        &self,
127        strategy: PreparedFluentExistingRowsTerminalStrategy,
128    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
129    where
130        E: EntityValue,
131    {
132        self.execute_scalar_non_paged_terminal(move |load, plan| {
133            load.execute_scalar_terminal_request(
134                plan,
135                existing_rows_terminal_boundary_request_from_prepared(
136                    strategy.runtime_request().clone(),
137                ),
138            )
139        })
140    }
141
142    // Execute one prepared fluent numeric-field terminal through the canonical
143    // non-paged fluent policy gate using the prepared numeric strategy as the
144    // single runtime source.
145    fn execute_prepared_numeric_field_terminal(
146        &self,
147        strategy: PreparedFluentNumericFieldStrategy,
148    ) -> Result<Option<Decimal>, QueryError>
149    where
150        E: EntityValue,
151    {
152        self.execute_scalar_non_paged_terminal(move |load, plan| {
153            load.execute_numeric_field_boundary(
154                plan,
155                strategy.target_field().clone(),
156                numeric_field_boundary_request_from_prepared(strategy.runtime_request()),
157            )
158        })
159    }
160
161    // Execute one prepared fluent order-sensitive terminal through the
162    // canonical non-paged fluent policy gate using the prepared order-sensitive
163    // strategy as the single runtime source.
164    fn execute_prepared_order_sensitive_terminal_output(
165        &self,
166        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
167    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
168    where
169        E: EntityValue,
170    {
171        self.execute_scalar_non_paged_terminal(move |load, plan| {
172            load.execute_scalar_terminal_request(
173                plan,
174                order_sensitive_terminal_boundary_request_from_prepared(
175                    strategy.runtime_request().clone(),
176                ),
177            )
178        })
179    }
180
181    // Execute one prepared fluent projection/distinct terminal through the
182    // canonical non-paged fluent policy gate using the prepared projection
183    // strategy as the single runtime source.
184    fn execute_prepared_projection_terminal_output(
185        &self,
186        strategy: PreparedFluentProjectionStrategy,
187    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
188    where
189        E: EntityValue,
190    {
191        self.execute_scalar_non_paged_terminal(move |load, plan| {
192            load.execute_scalar_projection_boundary(
193                plan,
194                strategy.target_field().clone(),
195                projection_boundary_request_from_prepared(strategy.runtime_request()),
196            )
197        })
198    }
199
200    // ------------------------------------------------------------------
201    // Execution terminals — semantic only
202    // ------------------------------------------------------------------
203
204    /// Execute and return whether the result set is empty.
205    pub fn is_empty(&self) -> Result<bool, QueryError>
206    where
207        E: EntityValue,
208    {
209        self.not_exists()
210    }
211
212    /// Execute and return whether no matching row exists.
213    pub fn not_exists(&self) -> Result<bool, QueryError>
214    where
215        E: EntityValue,
216    {
217        Ok(!self.exists()?)
218    }
219
220    /// Execute and return whether at least one matching row exists.
221    pub fn exists(&self) -> Result<bool, QueryError>
222    where
223        E: EntityValue,
224    {
225        self.execute_prepared_existing_rows_terminal_output(
226            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
227        )?
228        .into_exists()
229        .map_err(QueryError::execute)
230    }
231
232    /// Explain scalar `exists()` routing without executing the terminal.
233    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
234    where
235        E: EntityValue,
236    {
237        self.explain_prepared_aggregate_non_paged_terminal(
238            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
239        )
240    }
241
242    /// Explain scalar `not_exists()` routing without executing the terminal.
243    ///
244    /// This remains an `exists()` execution plan with negated boolean semantics.
245    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
246    where
247        E: EntityValue,
248    {
249        self.explain_exists()
250    }
251
252    /// Explain scalar load execution shape without executing the query.
253    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
254    where
255        E: EntityValue,
256    {
257        self.session
258            .explain_query_execution_with_visible_indexes(self.query())
259    }
260
261    /// Explain scalar load execution shape as deterministic text.
262    pub fn explain_execution_text(&self) -> Result<String, QueryError>
263    where
264        E: EntityValue,
265    {
266        self.session
267            .explain_query_execution_text_with_visible_indexes(self.query())
268    }
269
270    /// Explain scalar load execution shape as canonical JSON.
271    pub fn explain_execution_json(&self) -> Result<String, QueryError>
272    where
273        E: EntityValue,
274    {
275        self.session
276            .explain_query_execution_json_with_visible_indexes(self.query())
277    }
278
279    /// Explain scalar load execution shape as verbose text with diagnostics.
280    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
281    where
282        E: EntityValue,
283    {
284        self.session
285            .explain_query_execution_verbose_with_visible_indexes(self.query())
286    }
287
288    /// Execute and return the number of matching rows.
289    pub fn count(&self) -> Result<u32, QueryError>
290    where
291        E: EntityValue,
292    {
293        self.execute_prepared_existing_rows_terminal_output(
294            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
295        )?
296        .into_count()
297        .map_err(QueryError::execute)
298    }
299
300    /// Execute and return the total persisted payload bytes for the effective
301    /// result window.
302    pub fn bytes(&self) -> Result<u64, QueryError>
303    where
304        E: EntityValue,
305    {
306        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
307    }
308
309    /// Execute and return the total serialized bytes for `field` over the
310    /// effective result window.
311    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
312    where
313        E: EntityValue,
314    {
315        self.ensure_non_paged_mode_ready()?;
316
317        Self::with_slot(field, |target_slot| {
318            self.session
319                .execute_load_query_with(self.query(), move |load, plan| {
320                    load.bytes_by_slot(plan, target_slot)
321                })
322        })
323    }
324
325    /// Explain `bytes_by(field)` routing without executing the terminal.
326    pub fn explain_bytes_by(
327        &self,
328        field: impl AsRef<str>,
329    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
330    where
331        E: EntityValue,
332    {
333        self.ensure_non_paged_mode_ready()?;
334
335        Self::with_slot(field, |target_slot| {
336            self.session
337                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
338        })
339    }
340
341    /// Execute and return the smallest matching identifier, if any.
342    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
343    where
344        E: EntityValue,
345    {
346        self.execute_prepared_scalar_terminal_output(
347            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
348        )?
349        .into_id()
350        .map_err(QueryError::execute)
351    }
352
353    /// Explain scalar `min()` routing without executing the terminal.
354    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
355    where
356        E: EntityValue,
357    {
358        self.explain_prepared_aggregate_non_paged_terminal(
359            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
360        )
361    }
362
363    /// Execute and return the id of the row with the smallest value for `field`.
364    ///
365    /// Ties are deterministic: equal field values resolve by primary key ascending.
366    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
367    where
368        E: EntityValue,
369    {
370        self.ensure_non_paged_mode_ready()?;
371
372        Self::with_slot(field, |target_slot| {
373            self.execute_prepared_scalar_terminal_output(
374                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
375            )?
376            .into_id()
377            .map_err(QueryError::execute)
378        })
379    }
380
381    /// Execute and return the largest matching identifier, if any.
382    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
383    where
384        E: EntityValue,
385    {
386        self.execute_prepared_scalar_terminal_output(
387            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
388        )?
389        .into_id()
390        .map_err(QueryError::execute)
391    }
392
393    /// Explain scalar `max()` routing without executing the terminal.
394    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
395    where
396        E: EntityValue,
397    {
398        self.explain_prepared_aggregate_non_paged_terminal(
399            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
400        )
401    }
402
403    /// Execute and return the id of the row with the largest value for `field`.
404    ///
405    /// Ties are deterministic: equal field values resolve by primary key ascending.
406    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
407    where
408        E: EntityValue,
409    {
410        self.ensure_non_paged_mode_ready()?;
411
412        Self::with_slot(field, |target_slot| {
413            self.execute_prepared_scalar_terminal_output(
414                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
415            )?
416            .into_id()
417            .map_err(QueryError::execute)
418        })
419    }
420
421    /// Execute and return the id at zero-based ordinal `nth` when rows are
422    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
423    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
424    where
425        E: EntityValue,
426    {
427        self.ensure_non_paged_mode_ready()?;
428
429        Self::with_slot(field, |target_slot| {
430            self.execute_prepared_order_sensitive_terminal_output(
431                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
432            )?
433            .into_id()
434            .map_err(QueryError::execute)
435        })
436    }
437
438    /// Execute and return the sum of `field` over matching rows.
439    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
440    where
441        E: EntityValue,
442    {
443        self.ensure_non_paged_mode_ready()?;
444
445        Self::with_slot(field, |target_slot| {
446            self.execute_prepared_numeric_field_terminal(
447                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
448            )
449        })
450    }
451
452    /// Explain scalar `sum_by(field)` routing without executing the terminal.
453    pub fn explain_sum_by(
454        &self,
455        field: impl AsRef<str>,
456    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
457    where
458        E: EntityValue,
459    {
460        self.ensure_non_paged_mode_ready()?;
461
462        Self::with_slot(field, |target_slot| {
463            self.explain_prepared_aggregate_non_paged_terminal(
464                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
465            )
466        })
467    }
468
469    /// Execute and return the sum of distinct `field` values.
470    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
471    where
472        E: EntityValue,
473    {
474        self.ensure_non_paged_mode_ready()?;
475
476        Self::with_slot(field, |target_slot| {
477            self.execute_prepared_numeric_field_terminal(
478                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
479            )
480        })
481    }
482
483    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
484    pub fn explain_sum_distinct_by(
485        &self,
486        field: impl AsRef<str>,
487    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
488    where
489        E: EntityValue,
490    {
491        self.ensure_non_paged_mode_ready()?;
492
493        Self::with_slot(field, |target_slot| {
494            self.explain_prepared_aggregate_non_paged_terminal(
495                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
496            )
497        })
498    }
499
500    /// Execute and return the average of `field` over matching rows.
501    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
502    where
503        E: EntityValue,
504    {
505        self.ensure_non_paged_mode_ready()?;
506
507        Self::with_slot(field, |target_slot| {
508            self.execute_prepared_numeric_field_terminal(
509                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
510            )
511        })
512    }
513
514    /// Explain scalar `avg_by(field)` routing without executing the terminal.
515    pub fn explain_avg_by(
516        &self,
517        field: impl AsRef<str>,
518    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
519    where
520        E: EntityValue,
521    {
522        self.ensure_non_paged_mode_ready()?;
523
524        Self::with_slot(field, |target_slot| {
525            self.explain_prepared_aggregate_non_paged_terminal(
526                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
527            )
528        })
529    }
530
531    /// Execute and return the average of distinct `field` values.
532    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
533    where
534        E: EntityValue,
535    {
536        self.ensure_non_paged_mode_ready()?;
537
538        Self::with_slot(field, |target_slot| {
539            self.execute_prepared_numeric_field_terminal(
540                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
541            )
542        })
543    }
544
545    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
546    pub fn explain_avg_distinct_by(
547        &self,
548        field: impl AsRef<str>,
549    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
550    where
551        E: EntityValue,
552    {
553        self.ensure_non_paged_mode_ready()?;
554
555        Self::with_slot(field, |target_slot| {
556            self.explain_prepared_aggregate_non_paged_terminal(
557                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
558            )
559        })
560    }
561
562    /// Execute and return the median id by `field` using deterministic ordering
563    /// `(field asc, primary key asc)`.
564    ///
565    /// Even-length windows select the lower median.
566    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
567    where
568        E: EntityValue,
569    {
570        self.ensure_non_paged_mode_ready()?;
571
572        Self::with_slot(field, |target_slot| {
573            self.execute_prepared_order_sensitive_terminal_output(
574                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
575            )?
576            .into_id()
577            .map_err(QueryError::execute)
578        })
579    }
580
581    /// Execute and return the number of distinct values for `field` over the
582    /// effective result window.
583    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
584    where
585        E: EntityValue,
586    {
587        self.ensure_non_paged_mode_ready()?;
588
589        Self::with_slot(field, |target_slot| {
590            self.execute_prepared_projection_terminal_output(
591                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
592            )?
593            .into_count()
594            .map_err(QueryError::execute)
595        })
596    }
597
598    /// Explain `count_distinct_by(field)` routing without executing the terminal.
599    pub fn explain_count_distinct_by(
600        &self,
601        field: impl AsRef<str>,
602    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
603    where
604        E: EntityValue,
605    {
606        self.ensure_non_paged_mode_ready()?;
607
608        Self::with_slot(field, |target_slot| {
609            self.explain_prepared_projection_non_paged_terminal(
610                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
611            )
612        })
613    }
614
615    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
616    ///
617    /// Tie handling is deterministic for both extrema: primary key ascending.
618    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
619    where
620        E: EntityValue,
621    {
622        self.ensure_non_paged_mode_ready()?;
623
624        Self::with_slot(field, |target_slot| {
625            self.execute_prepared_order_sensitive_terminal_output(
626                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
627            )?
628            .into_id_pair()
629            .map_err(QueryError::execute)
630        })
631    }
632
633    /// Execute and return projected field values for the effective result window.
634    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
635    where
636        E: EntityValue,
637    {
638        self.ensure_non_paged_mode_ready()?;
639
640        Self::with_slot(field, |target_slot| {
641            self.execute_prepared_projection_terminal_output(
642                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
643            )?
644            .into_values()
645            .map_err(QueryError::execute)
646        })
647    }
648
649    /// Explain `values_by(field)` routing without executing the terminal.
650    pub fn explain_values_by(
651        &self,
652        field: impl AsRef<str>,
653    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
654    where
655        E: EntityValue,
656    {
657        self.ensure_non_paged_mode_ready()?;
658
659        Self::with_slot(field, |target_slot| {
660            self.explain_prepared_projection_non_paged_terminal(
661                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
662            )
663        })
664    }
665
666    /// Execute and return the first `k` rows from the effective response window.
667    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
668    where
669        E: EntityValue,
670    {
671        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
672    }
673
674    /// Execute and return the top `k` rows by `field` under deterministic
675    /// ordering `(field desc, primary_key asc)` over the effective response
676    /// window.
677    ///
678    /// This terminal applies its own ordering and does not preserve query
679    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
680    /// matches `max_by(field)` selection semantics.
681    pub fn top_k_by(
682        &self,
683        field: impl AsRef<str>,
684        take_count: u32,
685    ) -> Result<EntityResponse<E>, QueryError>
686    where
687        E: EntityValue,
688    {
689        self.ensure_non_paged_mode_ready()?;
690
691        Self::with_slot(field, |target_slot| {
692            self.session
693                .execute_load_query_with(self.query(), move |load, plan| {
694                    load.top_k_by_slot(plan, target_slot, take_count)
695                })
696        })
697    }
698
699    /// Execute and return the bottom `k` rows by `field` under deterministic
700    /// ordering `(field asc, primary_key asc)` over the effective response
701    /// window.
702    ///
703    /// This terminal applies its own ordering and does not preserve query
704    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
705    /// matches `min_by(field)` selection semantics.
706    pub fn bottom_k_by(
707        &self,
708        field: impl AsRef<str>,
709        take_count: u32,
710    ) -> Result<EntityResponse<E>, QueryError>
711    where
712        E: EntityValue,
713    {
714        self.ensure_non_paged_mode_ready()?;
715
716        Self::with_slot(field, |target_slot| {
717            self.session
718                .execute_load_query_with(self.query(), move |load, plan| {
719                    load.bottom_k_by_slot(plan, target_slot, take_count)
720                })
721        })
722    }
723
724    /// Execute and return projected values for the top `k` rows by `field`
725    /// under deterministic ordering `(field desc, primary_key asc)` over the
726    /// effective response window.
727    ///
728    /// Ranking is applied before projection and does not preserve query
729    /// `order_by(...)` row order in the returned values. For `k = 1`, this
730    /// matches `max_by(field)` projected to one value.
731    pub fn top_k_by_values(
732        &self,
733        field: impl AsRef<str>,
734        take_count: u32,
735    ) -> Result<Vec<Value>, QueryError>
736    where
737        E: EntityValue,
738    {
739        self.ensure_non_paged_mode_ready()?;
740
741        Self::with_slot(field, |target_slot| {
742            self.session
743                .execute_load_query_with(self.query(), move |load, plan| {
744                    load.top_k_by_values_slot(plan, target_slot, take_count)
745                })
746        })
747    }
748
749    /// Execute and return projected values for the bottom `k` rows by `field`
750    /// under deterministic ordering `(field asc, primary_key asc)` over the
751    /// effective response window.
752    ///
753    /// Ranking is applied before projection and does not preserve query
754    /// `order_by(...)` row order in the returned values. For `k = 1`, this
755    /// matches `min_by(field)` projected to one value.
756    pub fn bottom_k_by_values(
757        &self,
758        field: impl AsRef<str>,
759        take_count: u32,
760    ) -> Result<Vec<Value>, QueryError>
761    where
762        E: EntityValue,
763    {
764        self.ensure_non_paged_mode_ready()?;
765
766        Self::with_slot(field, |target_slot| {
767            self.session
768                .execute_load_query_with(self.query(), move |load, plan| {
769                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
770                })
771        })
772    }
773
774    /// Execute and return projected id/value pairs for the top `k` rows by
775    /// `field` under deterministic ordering `(field desc, primary_key asc)`
776    /// over the effective response window.
777    ///
778    /// Ranking is applied before projection and does not preserve query
779    /// `order_by(...)` row order in the returned values. For `k = 1`, this
780    /// matches `max_by(field)` projected to one `(id, value)` pair.
781    pub fn top_k_by_with_ids(
782        &self,
783        field: impl AsRef<str>,
784        take_count: u32,
785    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
786    where
787        E: EntityValue,
788    {
789        self.ensure_non_paged_mode_ready()?;
790
791        Self::with_slot(field, |target_slot| {
792            self.session
793                .execute_load_query_with(self.query(), move |load, plan| {
794                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
795                })
796        })
797    }
798
799    /// Execute and return projected id/value pairs for the bottom `k` rows by
800    /// `field` under deterministic ordering `(field asc, primary_key asc)`
801    /// over the effective response window.
802    ///
803    /// Ranking is applied before projection and does not preserve query
804    /// `order_by(...)` row order in the returned values. For `k = 1`, this
805    /// matches `min_by(field)` projected to one `(id, value)` pair.
806    pub fn bottom_k_by_with_ids(
807        &self,
808        field: impl AsRef<str>,
809        take_count: u32,
810    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
811    where
812        E: EntityValue,
813    {
814        self.ensure_non_paged_mode_ready()?;
815
816        Self::with_slot(field, |target_slot| {
817            self.session
818                .execute_load_query_with(self.query(), move |load, plan| {
819                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
820                })
821        })
822    }
823
824    /// Execute and return distinct projected field values for the effective
825    /// result window, preserving first-observed value order.
826    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
827    where
828        E: EntityValue,
829    {
830        self.ensure_non_paged_mode_ready()?;
831
832        Self::with_slot(field, |target_slot| {
833            self.execute_prepared_projection_terminal_output(
834                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
835            )?
836            .into_values()
837            .map_err(QueryError::execute)
838        })
839    }
840
841    /// Explain `distinct_values_by(field)` routing without executing the terminal.
842    pub fn explain_distinct_values_by(
843        &self,
844        field: impl AsRef<str>,
845    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
846    where
847        E: EntityValue,
848    {
849        self.ensure_non_paged_mode_ready()?;
850
851        Self::with_slot(field, |target_slot| {
852            self.explain_prepared_projection_non_paged_terminal(
853                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
854            )
855        })
856    }
857
858    /// Execute and return projected field values paired with row ids for the
859    /// effective result window.
860    pub fn values_by_with_ids(
861        &self,
862        field: impl AsRef<str>,
863    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
864    where
865        E: EntityValue,
866    {
867        self.ensure_non_paged_mode_ready()?;
868
869        Self::with_slot(field, |target_slot| {
870            self.execute_prepared_projection_terminal_output(
871                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
872            )?
873            .into_values_with_ids()
874            .map_err(QueryError::execute)
875        })
876    }
877
878    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
879    pub fn explain_values_by_with_ids(
880        &self,
881        field: impl AsRef<str>,
882    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
883    where
884        E: EntityValue,
885    {
886        self.ensure_non_paged_mode_ready()?;
887
888        Self::with_slot(field, |target_slot| {
889            self.explain_prepared_projection_non_paged_terminal(
890                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
891            )
892        })
893    }
894
895    /// Execute and return the first projected field value in effective response
896    /// order, if any.
897    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
898    where
899        E: EntityValue,
900    {
901        self.ensure_non_paged_mode_ready()?;
902
903        Self::with_slot(field, |target_slot| {
904            self.execute_prepared_projection_terminal_output(
905                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
906            )?
907            .into_terminal_value()
908            .map_err(QueryError::execute)
909        })
910    }
911
912    /// Explain `first_value_by(field)` routing without executing the terminal.
913    pub fn explain_first_value_by(
914        &self,
915        field: impl AsRef<str>,
916    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
917    where
918        E: EntityValue,
919    {
920        self.ensure_non_paged_mode_ready()?;
921
922        Self::with_slot(field, |target_slot| {
923            self.explain_prepared_projection_non_paged_terminal(
924                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
925            )
926        })
927    }
928
929    /// Execute and return the last projected field value in effective response
930    /// order, if any.
931    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
932    where
933        E: EntityValue,
934    {
935        self.ensure_non_paged_mode_ready()?;
936
937        Self::with_slot(field, |target_slot| {
938            self.execute_prepared_projection_terminal_output(
939                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
940            )?
941            .into_terminal_value()
942            .map_err(QueryError::execute)
943        })
944    }
945
946    /// Explain `last_value_by(field)` routing without executing the terminal.
947    pub fn explain_last_value_by(
948        &self,
949        field: impl AsRef<str>,
950    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
951    where
952        E: EntityValue,
953    {
954        self.ensure_non_paged_mode_ready()?;
955
956        Self::with_slot(field, |target_slot| {
957            self.explain_prepared_projection_non_paged_terminal(
958                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
959            )
960        })
961    }
962
963    /// Execute and return the first matching identifier in response order, if any.
964    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
965    where
966        E: EntityValue,
967    {
968        self.execute_prepared_order_sensitive_terminal_output(
969            PreparedFluentOrderSensitiveTerminalStrategy::first(),
970        )?
971        .into_id()
972        .map_err(QueryError::execute)
973    }
974
975    /// Explain scalar `first()` routing without executing the terminal.
976    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
977    where
978        E: EntityValue,
979    {
980        self.explain_prepared_aggregate_non_paged_terminal(
981            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
982        )
983    }
984
985    /// Execute and return the last matching identifier in response order, if any.
986    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
987    where
988        E: EntityValue,
989    {
990        self.execute_prepared_order_sensitive_terminal_output(
991            PreparedFluentOrderSensitiveTerminalStrategy::last(),
992        )?
993        .into_id()
994        .map_err(QueryError::execute)
995    }
996
997    /// Explain scalar `last()` routing without executing the terminal.
998    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
999    where
1000        E: EntityValue,
1001    {
1002        self.explain_prepared_aggregate_non_paged_terminal(
1003            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1004        )
1005    }
1006
1007    /// Execute and require exactly one matching row.
1008    pub fn require_one(&self) -> Result<(), QueryError>
1009    where
1010        E: EntityValue,
1011    {
1012        self.execute()?.require_one()?;
1013        Ok(())
1014    }
1015
1016    /// Execute and require at least one matching row.
1017    pub fn require_some(&self) -> Result<(), QueryError>
1018    where
1019        E: EntityValue,
1020    {
1021        self.execute()?.require_some()?;
1022        Ok(())
1023    }
1024}
1025
1026fn scalar_terminal_boundary_request_from_prepared(
1027    request: PreparedFluentScalarTerminalRuntimeRequest,
1028) -> ScalarTerminalBoundaryRequest {
1029    match request {
1030        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1031            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1032        }
1033        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1034            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1035        }
1036    }
1037}
1038
1039const fn existing_rows_terminal_boundary_request_from_prepared(
1040    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1041) -> ScalarTerminalBoundaryRequest {
1042    match request {
1043        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1044            ScalarTerminalBoundaryRequest::Count
1045        }
1046        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1047            ScalarTerminalBoundaryRequest::Exists
1048        }
1049    }
1050}
1051
1052const fn numeric_field_boundary_request_from_prepared(
1053    request: PreparedFluentNumericFieldRuntimeRequest,
1054) -> ScalarNumericFieldBoundaryRequest {
1055    match request {
1056        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1057        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1058            ScalarNumericFieldBoundaryRequest::SumDistinct
1059        }
1060        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1061        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1062            ScalarNumericFieldBoundaryRequest::AvgDistinct
1063        }
1064    }
1065}
1066
1067fn order_sensitive_terminal_boundary_request_from_prepared(
1068    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1069) -> ScalarTerminalBoundaryRequest {
1070    match request {
1071        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1072            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1073        }
1074        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1075            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1076        }
1077        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1078            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1079        }
1080        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1081            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1082        }
1083    }
1084}
1085
1086const fn projection_boundary_request_from_prepared(
1087    request: PreparedFluentProjectionRuntimeRequest,
1088) -> ScalarProjectionBoundaryRequest {
1089    match request {
1090        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1091        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1092            ScalarProjectionBoundaryRequest::DistinctValues
1093        }
1094        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1095            ScalarProjectionBoundaryRequest::CountDistinct
1096        }
1097        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1098            ScalarProjectionBoundaryRequest::ValuesWithIds
1099        }
1100        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1101            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1102        }
1103    }
1104}