Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25            },
26            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
27            fluent::load::FluentLoadQuery,
28            intent::QueryError,
29            plan::AggregateKind,
30        },
31        response::EntityResponse,
32    },
33    error::InternalError,
34    traits::EntityValue,
35    types::{Decimal, Id},
36    value::Value,
37};
38
39type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
40
41impl<E> FluentLoadQuery<'_, E>
42where
43    E: PersistedRow,
44{
45    // ------------------------------------------------------------------
46    // Execution (single semantic boundary)
47    // ------------------------------------------------------------------
48
49    /// Execute this query using the session's policy settings.
50    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
51    where
52        E: EntityValue,
53    {
54        self.ensure_non_paged_mode_ready()?;
55
56        self.session.execute_query(self.query())
57    }
58
59    // Run one scalar terminal through the canonical non-paged fluent policy
60    // gate before handing execution to the session load-query adapter.
61    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
62    where
63        E: EntityValue,
64        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
65    {
66        self.ensure_non_paged_mode_ready()?;
67
68        self.session.execute_load_query_with(self.query(), execute)
69    }
70
71    // Run one explain-visible aggregate terminal through the canonical
72    // non-paged fluent policy gate using the prepared aggregate strategy as
73    // the single explain projection source.
74    fn explain_prepared_aggregate_non_paged_terminal<S>(
75        &self,
76        strategy: &S,
77    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
78    where
79        E: EntityValue,
80        S: PreparedFluentAggregateExplainStrategy,
81    {
82        self.ensure_non_paged_mode_ready()?;
83
84        self.session
85            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
86    }
87
88    // Run one prepared projection/distinct explain terminal through the
89    // canonical non-paged fluent policy gate using the prepared projection
90    // strategy as the single explain projection source.
91    fn explain_prepared_projection_non_paged_terminal(
92        &self,
93        strategy: &PreparedFluentProjectionStrategy,
94    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
95    where
96        E: EntityValue,
97    {
98        self.ensure_non_paged_mode_ready()?;
99
100        self.session
101            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
102    }
103
104    // Execute one prepared fluent scalar terminal through the canonical
105    // non-paged fluent policy gate using the prepared runtime request as the
106    // single execution source.
107    fn execute_prepared_scalar_terminal_output(
108        &self,
109        strategy: PreparedFluentScalarTerminalStrategy,
110    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
111    where
112        E: EntityValue,
113    {
114        let runtime_request = strategy.into_runtime_request();
115
116        self.execute_scalar_non_paged_terminal(move |load, plan| {
117            load.execute_scalar_terminal_request(
118                plan,
119                scalar_terminal_boundary_request_from_prepared(runtime_request),
120            )
121        })
122    }
123
124    // Execute one prepared fluent existing-rows terminal through the
125    // canonical non-paged fluent policy gate using the prepared existing-rows
126    // strategy as the single runtime source.
127    fn execute_prepared_existing_rows_terminal_output(
128        &self,
129        strategy: PreparedFluentExistingRowsTerminalStrategy,
130    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
131    where
132        E: EntityValue,
133    {
134        let runtime_request = strategy.into_runtime_request();
135
136        self.execute_scalar_non_paged_terminal(move |load, plan| {
137            load.execute_scalar_terminal_request(
138                plan,
139                existing_rows_terminal_boundary_request_from_prepared(runtime_request),
140            )
141        })
142    }
143
144    // Execute one prepared fluent numeric-field terminal through the canonical
145    // non-paged fluent policy gate using the prepared numeric strategy as the
146    // single runtime source.
147    fn execute_prepared_numeric_field_terminal(
148        &self,
149        strategy: PreparedFluentNumericFieldStrategy,
150    ) -> Result<Option<Decimal>, QueryError>
151    where
152        E: EntityValue,
153    {
154        let (target_field, runtime_request) = strategy.into_runtime_parts();
155
156        self.execute_scalar_non_paged_terminal(move |load, plan| {
157            load.execute_numeric_field_boundary(
158                plan,
159                target_field,
160                numeric_field_boundary_request_from_prepared(runtime_request),
161            )
162        })
163    }
164
165    // Execute one prepared fluent order-sensitive terminal through the
166    // canonical non-paged fluent policy gate using the prepared order-sensitive
167    // strategy as the single runtime source.
168    fn execute_prepared_order_sensitive_terminal_output(
169        &self,
170        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
171    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
172    where
173        E: EntityValue,
174    {
175        let runtime_request = strategy.into_runtime_request();
176
177        self.execute_scalar_non_paged_terminal(move |load, plan| {
178            load.execute_scalar_terminal_request(
179                plan,
180                order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
181            )
182        })
183    }
184
185    // Execute one prepared fluent projection/distinct terminal through the
186    // canonical non-paged fluent policy gate using the prepared projection
187    // strategy as the single runtime source.
188    fn execute_prepared_projection_terminal_output(
189        &self,
190        strategy: PreparedFluentProjectionStrategy,
191    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
192    where
193        E: EntityValue,
194    {
195        let (target_field, runtime_request) = strategy.into_runtime_parts();
196
197        self.execute_scalar_non_paged_terminal(move |load, plan| {
198            load.execute_scalar_projection_boundary(
199                plan,
200                target_field,
201                projection_boundary_request_from_prepared(runtime_request),
202            )
203        })
204    }
205
206    // ------------------------------------------------------------------
207    // Execution terminals — semantic only
208    // ------------------------------------------------------------------
209
210    /// Execute and return whether the result set is empty.
211    pub fn is_empty(&self) -> Result<bool, QueryError>
212    where
213        E: EntityValue,
214    {
215        self.not_exists()
216    }
217
218    /// Execute and return whether no matching row exists.
219    pub fn not_exists(&self) -> Result<bool, QueryError>
220    where
221        E: EntityValue,
222    {
223        Ok(!self.exists()?)
224    }
225
226    /// Execute and return whether at least one matching row exists.
227    pub fn exists(&self) -> Result<bool, QueryError>
228    where
229        E: EntityValue,
230    {
231        self.execute_prepared_existing_rows_terminal_output(
232            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
233        )?
234        .into_exists()
235        .map_err(QueryError::execute)
236    }
237
238    /// Explain scalar `exists()` routing without executing the terminal.
239    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
240    where
241        E: EntityValue,
242    {
243        self.explain_prepared_aggregate_non_paged_terminal(
244            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
245        )
246    }
247
248    /// Explain scalar `not_exists()` routing without executing the terminal.
249    ///
250    /// This remains an `exists()` execution plan with negated boolean semantics.
251    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
252    where
253        E: EntityValue,
254    {
255        self.explain_exists()
256    }
257
258    /// Explain scalar load execution shape without executing the query.
259    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
260    where
261        E: EntityValue,
262    {
263        self.session
264            .explain_query_execution_with_visible_indexes(self.query())
265    }
266
267    /// Explain scalar load execution shape as deterministic text.
268    pub fn explain_execution_text(&self) -> Result<String, QueryError>
269    where
270        E: EntityValue,
271    {
272        self.session
273            .explain_query_execution_text_with_visible_indexes(self.query())
274    }
275
276    /// Explain scalar load execution shape as canonical JSON.
277    pub fn explain_execution_json(&self) -> Result<String, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.session
282            .explain_query_execution_json_with_visible_indexes(self.query())
283    }
284
285    /// Explain scalar load execution shape as verbose text with diagnostics.
286    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
287    where
288        E: EntityValue,
289    {
290        self.session
291            .explain_query_execution_verbose_with_visible_indexes(self.query())
292    }
293
294    /// Execute and return the number of matching rows.
295    pub fn count(&self) -> Result<u32, QueryError>
296    where
297        E: EntityValue,
298    {
299        self.execute_prepared_existing_rows_terminal_output(
300            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
301        )?
302        .into_count()
303        .map_err(QueryError::execute)
304    }
305
306    /// Execute and return the total persisted payload bytes for the effective
307    /// result window.
308    pub fn bytes(&self) -> Result<u64, QueryError>
309    where
310        E: EntityValue,
311    {
312        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
313    }
314
315    /// Execute and return the total serialized bytes for `field` over the
316    /// effective result window.
317    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
318    where
319        E: EntityValue,
320    {
321        self.ensure_non_paged_mode_ready()?;
322
323        Self::with_slot(field, |target_slot| {
324            self.session
325                .execute_load_query_with(self.query(), move |load, plan| {
326                    load.bytes_by_slot(plan, target_slot)
327                })
328        })
329    }
330
331    /// Explain `bytes_by(field)` routing without executing the terminal.
332    pub fn explain_bytes_by(
333        &self,
334        field: impl AsRef<str>,
335    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
336    where
337        E: EntityValue,
338    {
339        self.ensure_non_paged_mode_ready()?;
340
341        Self::with_slot(field, |target_slot| {
342            self.session
343                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
344        })
345    }
346
347    /// Execute and return the smallest matching identifier, if any.
348    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
349    where
350        E: EntityValue,
351    {
352        self.execute_prepared_scalar_terminal_output(
353            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
354        )?
355        .into_id()
356        .map_err(QueryError::execute)
357    }
358
359    /// Explain scalar `min()` routing without executing the terminal.
360    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
361    where
362        E: EntityValue,
363    {
364        self.explain_prepared_aggregate_non_paged_terminal(
365            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
366        )
367    }
368
369    /// Execute and return the id of the row with the smallest value for `field`.
370    ///
371    /// Ties are deterministic: equal field values resolve by primary key ascending.
372    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
373    where
374        E: EntityValue,
375    {
376        self.ensure_non_paged_mode_ready()?;
377
378        Self::with_slot(field, |target_slot| {
379            self.execute_prepared_scalar_terminal_output(
380                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
381            )?
382            .into_id()
383            .map_err(QueryError::execute)
384        })
385    }
386
387    /// Execute and return the largest matching identifier, if any.
388    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
389    where
390        E: EntityValue,
391    {
392        self.execute_prepared_scalar_terminal_output(
393            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
394        )?
395        .into_id()
396        .map_err(QueryError::execute)
397    }
398
399    /// Explain scalar `max()` routing without executing the terminal.
400    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
401    where
402        E: EntityValue,
403    {
404        self.explain_prepared_aggregate_non_paged_terminal(
405            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
406        )
407    }
408
409    /// Execute and return the id of the row with the largest value for `field`.
410    ///
411    /// Ties are deterministic: equal field values resolve by primary key ascending.
412    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
413    where
414        E: EntityValue,
415    {
416        self.ensure_non_paged_mode_ready()?;
417
418        Self::with_slot(field, |target_slot| {
419            self.execute_prepared_scalar_terminal_output(
420                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
421            )?
422            .into_id()
423            .map_err(QueryError::execute)
424        })
425    }
426
427    /// Execute and return the id at zero-based ordinal `nth` when rows are
428    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
429    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
430    where
431        E: EntityValue,
432    {
433        self.ensure_non_paged_mode_ready()?;
434
435        Self::with_slot(field, |target_slot| {
436            self.execute_prepared_order_sensitive_terminal_output(
437                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
438            )?
439            .into_id()
440            .map_err(QueryError::execute)
441        })
442    }
443
444    /// Execute and return the sum of `field` over matching rows.
445    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
446    where
447        E: EntityValue,
448    {
449        self.ensure_non_paged_mode_ready()?;
450
451        Self::with_slot(field, |target_slot| {
452            self.execute_prepared_numeric_field_terminal(
453                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
454            )
455        })
456    }
457
458    /// Explain scalar `sum_by(field)` routing without executing the terminal.
459    pub fn explain_sum_by(
460        &self,
461        field: impl AsRef<str>,
462    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
463    where
464        E: EntityValue,
465    {
466        self.ensure_non_paged_mode_ready()?;
467
468        Self::with_slot(field, |target_slot| {
469            self.explain_prepared_aggregate_non_paged_terminal(
470                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
471            )
472        })
473    }
474
475    /// Execute and return the sum of distinct `field` values.
476    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
477    where
478        E: EntityValue,
479    {
480        self.ensure_non_paged_mode_ready()?;
481
482        Self::with_slot(field, |target_slot| {
483            self.execute_prepared_numeric_field_terminal(
484                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
485            )
486        })
487    }
488
489    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
490    pub fn explain_sum_distinct_by(
491        &self,
492        field: impl AsRef<str>,
493    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
494    where
495        E: EntityValue,
496    {
497        self.ensure_non_paged_mode_ready()?;
498
499        Self::with_slot(field, |target_slot| {
500            self.explain_prepared_aggregate_non_paged_terminal(
501                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
502            )
503        })
504    }
505
506    /// Execute and return the average of `field` over matching rows.
507    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
508    where
509        E: EntityValue,
510    {
511        self.ensure_non_paged_mode_ready()?;
512
513        Self::with_slot(field, |target_slot| {
514            self.execute_prepared_numeric_field_terminal(
515                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
516            )
517        })
518    }
519
520    /// Explain scalar `avg_by(field)` routing without executing the terminal.
521    pub fn explain_avg_by(
522        &self,
523        field: impl AsRef<str>,
524    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
525    where
526        E: EntityValue,
527    {
528        self.ensure_non_paged_mode_ready()?;
529
530        Self::with_slot(field, |target_slot| {
531            self.explain_prepared_aggregate_non_paged_terminal(
532                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
533            )
534        })
535    }
536
537    /// Execute and return the average of distinct `field` values.
538    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
539    where
540        E: EntityValue,
541    {
542        self.ensure_non_paged_mode_ready()?;
543
544        Self::with_slot(field, |target_slot| {
545            self.execute_prepared_numeric_field_terminal(
546                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
547            )
548        })
549    }
550
551    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
552    pub fn explain_avg_distinct_by(
553        &self,
554        field: impl AsRef<str>,
555    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
556    where
557        E: EntityValue,
558    {
559        self.ensure_non_paged_mode_ready()?;
560
561        Self::with_slot(field, |target_slot| {
562            self.explain_prepared_aggregate_non_paged_terminal(
563                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
564            )
565        })
566    }
567
568    /// Execute and return the median id by `field` using deterministic ordering
569    /// `(field asc, primary key asc)`.
570    ///
571    /// Even-length windows select the lower median.
572    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
573    where
574        E: EntityValue,
575    {
576        self.ensure_non_paged_mode_ready()?;
577
578        Self::with_slot(field, |target_slot| {
579            self.execute_prepared_order_sensitive_terminal_output(
580                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
581            )?
582            .into_id()
583            .map_err(QueryError::execute)
584        })
585    }
586
587    /// Execute and return the number of distinct values for `field` over the
588    /// effective result window.
589    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
590    where
591        E: EntityValue,
592    {
593        self.ensure_non_paged_mode_ready()?;
594
595        Self::with_slot(field, |target_slot| {
596            self.execute_prepared_projection_terminal_output(
597                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
598            )?
599            .into_count()
600            .map_err(QueryError::execute)
601        })
602    }
603
604    /// Explain `count_distinct_by(field)` routing without executing the terminal.
605    pub fn explain_count_distinct_by(
606        &self,
607        field: impl AsRef<str>,
608    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
609    where
610        E: EntityValue,
611    {
612        self.ensure_non_paged_mode_ready()?;
613
614        Self::with_slot(field, |target_slot| {
615            self.explain_prepared_projection_non_paged_terminal(
616                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
617            )
618        })
619    }
620
621    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
622    ///
623    /// Tie handling is deterministic for both extrema: primary key ascending.
624    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
625    where
626        E: EntityValue,
627    {
628        self.ensure_non_paged_mode_ready()?;
629
630        Self::with_slot(field, |target_slot| {
631            self.execute_prepared_order_sensitive_terminal_output(
632                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
633            )?
634            .into_id_pair()
635            .map_err(QueryError::execute)
636        })
637    }
638
639    /// Execute and return projected field values for the effective result window.
640    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
641    where
642        E: EntityValue,
643    {
644        self.ensure_non_paged_mode_ready()?;
645
646        Self::with_slot(field, |target_slot| {
647            self.execute_prepared_projection_terminal_output(
648                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
649            )?
650            .into_values()
651            .map_err(QueryError::execute)
652        })
653    }
654
655    /// Explain `values_by(field)` routing without executing the terminal.
656    pub fn explain_values_by(
657        &self,
658        field: impl AsRef<str>,
659    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
660    where
661        E: EntityValue,
662    {
663        self.ensure_non_paged_mode_ready()?;
664
665        Self::with_slot(field, |target_slot| {
666            self.explain_prepared_projection_non_paged_terminal(
667                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
668            )
669        })
670    }
671
672    /// Execute and return the first `k` rows from the effective response window.
673    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
674    where
675        E: EntityValue,
676    {
677        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
678    }
679
680    /// Execute and return the top `k` rows by `field` under deterministic
681    /// ordering `(field desc, primary_key asc)` over the effective response
682    /// window.
683    ///
684    /// This terminal applies its own ordering and does not preserve query
685    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
686    /// matches `max_by(field)` selection semantics.
687    pub fn top_k_by(
688        &self,
689        field: impl AsRef<str>,
690        take_count: u32,
691    ) -> Result<EntityResponse<E>, QueryError>
692    where
693        E: EntityValue,
694    {
695        self.ensure_non_paged_mode_ready()?;
696
697        Self::with_slot(field, |target_slot| {
698            self.session
699                .execute_load_query_with(self.query(), move |load, plan| {
700                    load.top_k_by_slot(plan, target_slot, take_count)
701                })
702        })
703    }
704
705    /// Execute and return the bottom `k` rows by `field` under deterministic
706    /// ordering `(field asc, primary_key asc)` over the effective response
707    /// window.
708    ///
709    /// This terminal applies its own ordering and does not preserve query
710    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
711    /// matches `min_by(field)` selection semantics.
712    pub fn bottom_k_by(
713        &self,
714        field: impl AsRef<str>,
715        take_count: u32,
716    ) -> Result<EntityResponse<E>, QueryError>
717    where
718        E: EntityValue,
719    {
720        self.ensure_non_paged_mode_ready()?;
721
722        Self::with_slot(field, |target_slot| {
723            self.session
724                .execute_load_query_with(self.query(), move |load, plan| {
725                    load.bottom_k_by_slot(plan, target_slot, take_count)
726                })
727        })
728    }
729
730    /// Execute and return projected values for the top `k` rows by `field`
731    /// under deterministic ordering `(field desc, primary_key asc)` over the
732    /// effective response window.
733    ///
734    /// Ranking is applied before projection and does not preserve query
735    /// `order_by(...)` row order in the returned values. For `k = 1`, this
736    /// matches `max_by(field)` projected to one value.
737    pub fn top_k_by_values(
738        &self,
739        field: impl AsRef<str>,
740        take_count: u32,
741    ) -> Result<Vec<Value>, QueryError>
742    where
743        E: EntityValue,
744    {
745        self.ensure_non_paged_mode_ready()?;
746
747        Self::with_slot(field, |target_slot| {
748            self.session
749                .execute_load_query_with(self.query(), move |load, plan| {
750                    load.top_k_by_values_slot(plan, target_slot, take_count)
751                })
752        })
753    }
754
755    /// Execute and return projected values for the bottom `k` rows by `field`
756    /// under deterministic ordering `(field asc, primary_key asc)` over the
757    /// effective response window.
758    ///
759    /// Ranking is applied before projection and does not preserve query
760    /// `order_by(...)` row order in the returned values. For `k = 1`, this
761    /// matches `min_by(field)` projected to one value.
762    pub fn bottom_k_by_values(
763        &self,
764        field: impl AsRef<str>,
765        take_count: u32,
766    ) -> Result<Vec<Value>, QueryError>
767    where
768        E: EntityValue,
769    {
770        self.ensure_non_paged_mode_ready()?;
771
772        Self::with_slot(field, |target_slot| {
773            self.session
774                .execute_load_query_with(self.query(), move |load, plan| {
775                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
776                })
777        })
778    }
779
780    /// Execute and return projected id/value pairs for the top `k` rows by
781    /// `field` under deterministic ordering `(field desc, primary_key asc)`
782    /// over the effective response window.
783    ///
784    /// Ranking is applied before projection and does not preserve query
785    /// `order_by(...)` row order in the returned values. For `k = 1`, this
786    /// matches `max_by(field)` projected to one `(id, value)` pair.
787    pub fn top_k_by_with_ids(
788        &self,
789        field: impl AsRef<str>,
790        take_count: u32,
791    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
792    where
793        E: EntityValue,
794    {
795        self.ensure_non_paged_mode_ready()?;
796
797        Self::with_slot(field, |target_slot| {
798            self.session
799                .execute_load_query_with(self.query(), move |load, plan| {
800                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
801                })
802        })
803    }
804
805    /// Execute and return projected id/value pairs for the bottom `k` rows by
806    /// `field` under deterministic ordering `(field asc, primary_key asc)`
807    /// over the effective response window.
808    ///
809    /// Ranking is applied before projection and does not preserve query
810    /// `order_by(...)` row order in the returned values. For `k = 1`, this
811    /// matches `min_by(field)` projected to one `(id, value)` pair.
812    pub fn bottom_k_by_with_ids(
813        &self,
814        field: impl AsRef<str>,
815        take_count: u32,
816    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
817    where
818        E: EntityValue,
819    {
820        self.ensure_non_paged_mode_ready()?;
821
822        Self::with_slot(field, |target_slot| {
823            self.session
824                .execute_load_query_with(self.query(), move |load, plan| {
825                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
826                })
827        })
828    }
829
830    /// Execute and return distinct projected field values for the effective
831    /// result window, preserving first-observed value order.
832    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
833    where
834        E: EntityValue,
835    {
836        self.ensure_non_paged_mode_ready()?;
837
838        Self::with_slot(field, |target_slot| {
839            self.execute_prepared_projection_terminal_output(
840                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
841            )?
842            .into_values()
843            .map_err(QueryError::execute)
844        })
845    }
846
847    /// Explain `distinct_values_by(field)` routing without executing the terminal.
848    pub fn explain_distinct_values_by(
849        &self,
850        field: impl AsRef<str>,
851    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
852    where
853        E: EntityValue,
854    {
855        self.ensure_non_paged_mode_ready()?;
856
857        Self::with_slot(field, |target_slot| {
858            self.explain_prepared_projection_non_paged_terminal(
859                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
860            )
861        })
862    }
863
864    /// Execute and return projected field values paired with row ids for the
865    /// effective result window.
866    pub fn values_by_with_ids(
867        &self,
868        field: impl AsRef<str>,
869    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
870    where
871        E: EntityValue,
872    {
873        self.ensure_non_paged_mode_ready()?;
874
875        Self::with_slot(field, |target_slot| {
876            self.execute_prepared_projection_terminal_output(
877                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
878            )?
879            .into_values_with_ids()
880            .map_err(QueryError::execute)
881        })
882    }
883
884    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
885    pub fn explain_values_by_with_ids(
886        &self,
887        field: impl AsRef<str>,
888    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
889    where
890        E: EntityValue,
891    {
892        self.ensure_non_paged_mode_ready()?;
893
894        Self::with_slot(field, |target_slot| {
895            self.explain_prepared_projection_non_paged_terminal(
896                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
897            )
898        })
899    }
900
901    /// Execute and return the first projected field value in effective response
902    /// order, if any.
903    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
904    where
905        E: EntityValue,
906    {
907        self.ensure_non_paged_mode_ready()?;
908
909        Self::with_slot(field, |target_slot| {
910            self.execute_prepared_projection_terminal_output(
911                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
912            )?
913            .into_terminal_value()
914            .map_err(QueryError::execute)
915        })
916    }
917
918    /// Explain `first_value_by(field)` routing without executing the terminal.
919    pub fn explain_first_value_by(
920        &self,
921        field: impl AsRef<str>,
922    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
923    where
924        E: EntityValue,
925    {
926        self.ensure_non_paged_mode_ready()?;
927
928        Self::with_slot(field, |target_slot| {
929            self.explain_prepared_projection_non_paged_terminal(
930                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
931            )
932        })
933    }
934
935    /// Execute and return the last projected field value in effective response
936    /// order, if any.
937    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
938    where
939        E: EntityValue,
940    {
941        self.ensure_non_paged_mode_ready()?;
942
943        Self::with_slot(field, |target_slot| {
944            self.execute_prepared_projection_terminal_output(
945                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
946            )?
947            .into_terminal_value()
948            .map_err(QueryError::execute)
949        })
950    }
951
952    /// Explain `last_value_by(field)` routing without executing the terminal.
953    pub fn explain_last_value_by(
954        &self,
955        field: impl AsRef<str>,
956    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
957    where
958        E: EntityValue,
959    {
960        self.ensure_non_paged_mode_ready()?;
961
962        Self::with_slot(field, |target_slot| {
963            self.explain_prepared_projection_non_paged_terminal(
964                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
965            )
966        })
967    }
968
969    /// Execute and return the first matching identifier in response order, if any.
970    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
971    where
972        E: EntityValue,
973    {
974        self.execute_prepared_order_sensitive_terminal_output(
975            PreparedFluentOrderSensitiveTerminalStrategy::first(),
976        )?
977        .into_id()
978        .map_err(QueryError::execute)
979    }
980
981    /// Explain scalar `first()` routing without executing the terminal.
982    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
983    where
984        E: EntityValue,
985    {
986        self.explain_prepared_aggregate_non_paged_terminal(
987            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
988        )
989    }
990
991    /// Execute and return the last matching identifier in response order, if any.
992    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
993    where
994        E: EntityValue,
995    {
996        self.execute_prepared_order_sensitive_terminal_output(
997            PreparedFluentOrderSensitiveTerminalStrategy::last(),
998        )?
999        .into_id()
1000        .map_err(QueryError::execute)
1001    }
1002
1003    /// Explain scalar `last()` routing without executing the terminal.
1004    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1005    where
1006        E: EntityValue,
1007    {
1008        self.explain_prepared_aggregate_non_paged_terminal(
1009            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1010        )
1011    }
1012
1013    /// Execute and require exactly one matching row.
1014    pub fn require_one(&self) -> Result<(), QueryError>
1015    where
1016        E: EntityValue,
1017    {
1018        self.execute()?.require_one()?;
1019        Ok(())
1020    }
1021
1022    /// Execute and require at least one matching row.
1023    pub fn require_some(&self) -> Result<(), QueryError>
1024    where
1025        E: EntityValue,
1026    {
1027        self.execute()?.require_some()?;
1028        Ok(())
1029    }
1030}
1031
1032fn scalar_terminal_boundary_request_from_prepared(
1033    request: PreparedFluentScalarTerminalRuntimeRequest,
1034) -> ScalarTerminalBoundaryRequest {
1035    match request {
1036        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1037            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1038        }
1039        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1040            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1041        }
1042    }
1043}
1044
1045const fn existing_rows_terminal_boundary_request_from_prepared(
1046    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1047) -> ScalarTerminalBoundaryRequest {
1048    match request {
1049        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1050            ScalarTerminalBoundaryRequest::Count
1051        }
1052        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1053            ScalarTerminalBoundaryRequest::Exists
1054        }
1055    }
1056}
1057
1058const fn numeric_field_boundary_request_from_prepared(
1059    request: PreparedFluentNumericFieldRuntimeRequest,
1060) -> ScalarNumericFieldBoundaryRequest {
1061    match request {
1062        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1063        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1064            ScalarNumericFieldBoundaryRequest::SumDistinct
1065        }
1066        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1067        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1068            ScalarNumericFieldBoundaryRequest::AvgDistinct
1069        }
1070    }
1071}
1072
1073fn order_sensitive_terminal_boundary_request_from_prepared(
1074    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1075) -> ScalarTerminalBoundaryRequest {
1076    match request {
1077        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1078            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1079        }
1080        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1081            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1082        }
1083        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1084            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1085        }
1086        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1087            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1088        }
1089    }
1090}
1091
1092const fn projection_boundary_request_from_prepared(
1093    request: PreparedFluentProjectionRuntimeRequest,
1094) -> ScalarProjectionBoundaryRequest {
1095    match request {
1096        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1097        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1098            ScalarProjectionBoundaryRequest::DistinctValues
1099        }
1100        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1101            ScalarProjectionBoundaryRequest::CountDistinct
1102        }
1103        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1104            ScalarProjectionBoundaryRequest::ValuesWithIds
1105        }
1106        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1107            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1108        }
1109    }
1110}