Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25            },
26            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
27            fluent::load::{FluentLoadQuery, LoadQueryResult},
28            intent::QueryError,
29            plan::AggregateKind,
30        },
31        response::EntityResponse,
32    },
33    error::InternalError,
34    traits::EntityValue,
35    types::{Decimal, Id},
36    value::Value,
37};
38
39type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
40
41impl<E> FluentLoadQuery<'_, E>
42where
43    E: PersistedRow,
44{
45    // ------------------------------------------------------------------
46    // Execution (single semantic boundary)
47    // ------------------------------------------------------------------
48
49    /// Execute this query using the session's policy settings.
50    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
51    where
52        E: EntityValue,
53    {
54        self.ensure_non_paged_mode_ready()?;
55
56        if self.query().has_grouping() {
57            return self
58                .session
59                .execute_grouped(self.query(), self.cursor_token.as_deref())
60                .map(LoadQueryResult::Grouped);
61        }
62
63        self.session
64            .execute_query(self.query())
65            .map(LoadQueryResult::Rows)
66    }
67
68    // Run one scalar terminal through the canonical non-paged fluent policy
69    // gate before handing execution to the session load-query adapter.
70    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
71    where
72        E: EntityValue,
73        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
74    {
75        self.ensure_non_paged_mode_ready()?;
76
77        self.session.execute_load_query_with(self.query(), execute)
78    }
79
80    // Run one explain-visible aggregate terminal through the canonical
81    // non-paged fluent policy gate using the prepared aggregate strategy as
82    // the single explain projection source.
83    fn explain_prepared_aggregate_non_paged_terminal<S>(
84        &self,
85        strategy: &S,
86    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
87    where
88        E: EntityValue,
89        S: PreparedFluentAggregateExplainStrategy,
90    {
91        self.ensure_non_paged_mode_ready()?;
92
93        self.session
94            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
95    }
96
97    // Run one prepared projection/distinct explain terminal through the
98    // canonical non-paged fluent policy gate using the prepared projection
99    // strategy as the single explain projection source.
100    fn explain_prepared_projection_non_paged_terminal(
101        &self,
102        strategy: &PreparedFluentProjectionStrategy,
103    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
104    where
105        E: EntityValue,
106    {
107        self.ensure_non_paged_mode_ready()?;
108
109        self.session
110            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
111    }
112
113    // Execute one prepared fluent scalar terminal through the canonical
114    // non-paged fluent policy gate using the prepared runtime request as the
115    // single execution source.
116    fn execute_prepared_scalar_terminal_output(
117        &self,
118        strategy: PreparedFluentScalarTerminalStrategy,
119    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
120    where
121        E: EntityValue,
122    {
123        let runtime_request = strategy.into_runtime_request();
124
125        self.execute_scalar_non_paged_terminal(move |load, plan| {
126            load.execute_scalar_terminal_request(
127                plan,
128                scalar_terminal_boundary_request_from_prepared(runtime_request),
129            )
130        })
131    }
132
133    // Execute one prepared fluent existing-rows terminal through the
134    // canonical non-paged fluent policy gate using the prepared existing-rows
135    // strategy as the single runtime source.
136    fn execute_prepared_existing_rows_terminal_output(
137        &self,
138        strategy: PreparedFluentExistingRowsTerminalStrategy,
139    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
140    where
141        E: EntityValue,
142    {
143        let runtime_request = strategy.into_runtime_request();
144
145        self.execute_scalar_non_paged_terminal(move |load, plan| {
146            load.execute_scalar_terminal_request(
147                plan,
148                existing_rows_terminal_boundary_request_from_prepared(runtime_request),
149            )
150        })
151    }
152
153    // Execute one prepared fluent numeric-field terminal through the canonical
154    // non-paged fluent policy gate using the prepared numeric strategy as the
155    // single runtime source.
156    fn execute_prepared_numeric_field_terminal(
157        &self,
158        strategy: PreparedFluentNumericFieldStrategy,
159    ) -> Result<Option<Decimal>, QueryError>
160    where
161        E: EntityValue,
162    {
163        let (target_field, runtime_request) = strategy.into_runtime_parts();
164
165        self.execute_scalar_non_paged_terminal(move |load, plan| {
166            load.execute_numeric_field_boundary(
167                plan,
168                target_field,
169                numeric_field_boundary_request_from_prepared(runtime_request),
170            )
171        })
172    }
173
174    // Execute one prepared fluent order-sensitive terminal through the
175    // canonical non-paged fluent policy gate using the prepared order-sensitive
176    // strategy as the single runtime source.
177    fn execute_prepared_order_sensitive_terminal_output(
178        &self,
179        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
180    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
181    where
182        E: EntityValue,
183    {
184        let runtime_request = strategy.into_runtime_request();
185
186        self.execute_scalar_non_paged_terminal(move |load, plan| {
187            load.execute_scalar_terminal_request(
188                plan,
189                order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
190            )
191        })
192    }
193
194    // Execute one prepared fluent projection/distinct terminal through the
195    // canonical non-paged fluent policy gate using the prepared projection
196    // strategy as the single runtime source.
197    fn execute_prepared_projection_terminal_output(
198        &self,
199        strategy: PreparedFluentProjectionStrategy,
200    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
201    where
202        E: EntityValue,
203    {
204        let (target_field, runtime_request) = strategy.into_runtime_parts();
205
206        self.execute_scalar_non_paged_terminal(move |load, plan| {
207            load.execute_scalar_projection_boundary(
208                plan,
209                target_field,
210                projection_boundary_request_from_prepared(runtime_request),
211            )
212        })
213    }
214
215    // ------------------------------------------------------------------
216    // Execution terminals — semantic only
217    // ------------------------------------------------------------------
218
219    /// Execute and return whether the result set is empty.
220    pub fn is_empty(&self) -> Result<bool, QueryError>
221    where
222        E: EntityValue,
223    {
224        self.not_exists()
225    }
226
227    /// Execute and return whether no matching row exists.
228    pub fn not_exists(&self) -> Result<bool, QueryError>
229    where
230        E: EntityValue,
231    {
232        Ok(!self.exists()?)
233    }
234
235    /// Execute and return whether at least one matching row exists.
236    pub fn exists(&self) -> Result<bool, QueryError>
237    where
238        E: EntityValue,
239    {
240        self.execute_prepared_existing_rows_terminal_output(
241            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
242        )?
243        .into_exists()
244        .map_err(QueryError::execute)
245    }
246
247    /// Explain scalar `exists()` routing without executing the terminal.
248    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
249    where
250        E: EntityValue,
251    {
252        self.explain_prepared_aggregate_non_paged_terminal(
253            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
254        )
255    }
256
257    /// Explain scalar `not_exists()` routing without executing the terminal.
258    ///
259    /// This remains an `exists()` execution plan with negated boolean semantics.
260    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
261    where
262        E: EntityValue,
263    {
264        self.explain_exists()
265    }
266
267    /// Explain scalar load execution shape without executing the query.
268    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
269    where
270        E: EntityValue,
271    {
272        self.session
273            .explain_query_execution_with_visible_indexes(self.query())
274    }
275
276    /// Explain scalar load execution shape as deterministic text.
277    pub fn explain_execution_text(&self) -> Result<String, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.session
282            .explain_query_execution_text_with_visible_indexes(self.query())
283    }
284
285    /// Explain scalar load execution shape as canonical JSON.
286    pub fn explain_execution_json(&self) -> Result<String, QueryError>
287    where
288        E: EntityValue,
289    {
290        self.session
291            .explain_query_execution_json_with_visible_indexes(self.query())
292    }
293
294    /// Explain scalar load execution shape as verbose text with diagnostics.
295    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
296    where
297        E: EntityValue,
298    {
299        self.session
300            .explain_query_execution_verbose_with_visible_indexes(self.query())
301    }
302
303    /// Execute and return the number of matching rows.
304    pub fn count(&self) -> Result<u32, QueryError>
305    where
306        E: EntityValue,
307    {
308        self.execute_prepared_existing_rows_terminal_output(
309            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
310        )?
311        .into_count()
312        .map_err(QueryError::execute)
313    }
314
315    /// Execute and return the total persisted payload bytes for the effective
316    /// result window.
317    pub fn bytes(&self) -> Result<u64, QueryError>
318    where
319        E: EntityValue,
320    {
321        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
322    }
323
324    /// Execute and return the total serialized bytes for `field` over the
325    /// effective result window.
326    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
327    where
328        E: EntityValue,
329    {
330        self.ensure_non_paged_mode_ready()?;
331
332        Self::with_slot(field, |target_slot| {
333            self.session
334                .execute_load_query_with(self.query(), move |load, plan| {
335                    load.bytes_by_slot(plan, target_slot)
336                })
337        })
338    }
339
340    /// Explain `bytes_by(field)` routing without executing the terminal.
341    pub fn explain_bytes_by(
342        &self,
343        field: impl AsRef<str>,
344    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
345    where
346        E: EntityValue,
347    {
348        self.ensure_non_paged_mode_ready()?;
349
350        Self::with_slot(field, |target_slot| {
351            self.session
352                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
353        })
354    }
355
356    /// Execute and return the smallest matching identifier, if any.
357    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
358    where
359        E: EntityValue,
360    {
361        self.execute_prepared_scalar_terminal_output(
362            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
363        )?
364        .into_id()
365        .map_err(QueryError::execute)
366    }
367
368    /// Explain scalar `min()` routing without executing the terminal.
369    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
370    where
371        E: EntityValue,
372    {
373        self.explain_prepared_aggregate_non_paged_terminal(
374            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
375        )
376    }
377
378    /// Execute and return the id of the row with the smallest value for `field`.
379    ///
380    /// Ties are deterministic: equal field values resolve by primary key ascending.
381    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
382    where
383        E: EntityValue,
384    {
385        self.ensure_non_paged_mode_ready()?;
386
387        Self::with_slot(field, |target_slot| {
388            self.execute_prepared_scalar_terminal_output(
389                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
390            )?
391            .into_id()
392            .map_err(QueryError::execute)
393        })
394    }
395
396    /// Execute and return the largest matching identifier, if any.
397    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.execute_prepared_scalar_terminal_output(
402            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
403        )?
404        .into_id()
405        .map_err(QueryError::execute)
406    }
407
408    /// Explain scalar `max()` routing without executing the terminal.
409    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
410    where
411        E: EntityValue,
412    {
413        self.explain_prepared_aggregate_non_paged_terminal(
414            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
415        )
416    }
417
418    /// Execute and return the id of the row with the largest value for `field`.
419    ///
420    /// Ties are deterministic: equal field values resolve by primary key ascending.
421    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
422    where
423        E: EntityValue,
424    {
425        self.ensure_non_paged_mode_ready()?;
426
427        Self::with_slot(field, |target_slot| {
428            self.execute_prepared_scalar_terminal_output(
429                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
430            )?
431            .into_id()
432            .map_err(QueryError::execute)
433        })
434    }
435
436    /// Execute and return the id at zero-based ordinal `nth` when rows are
437    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
438    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
439    where
440        E: EntityValue,
441    {
442        self.ensure_non_paged_mode_ready()?;
443
444        Self::with_slot(field, |target_slot| {
445            self.execute_prepared_order_sensitive_terminal_output(
446                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
447            )?
448            .into_id()
449            .map_err(QueryError::execute)
450        })
451    }
452
453    /// Execute and return the sum of `field` over matching rows.
454    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
455    where
456        E: EntityValue,
457    {
458        self.ensure_non_paged_mode_ready()?;
459
460        Self::with_slot(field, |target_slot| {
461            self.execute_prepared_numeric_field_terminal(
462                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
463            )
464        })
465    }
466
467    /// Explain scalar `sum_by(field)` routing without executing the terminal.
468    pub fn explain_sum_by(
469        &self,
470        field: impl AsRef<str>,
471    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
472    where
473        E: EntityValue,
474    {
475        self.ensure_non_paged_mode_ready()?;
476
477        Self::with_slot(field, |target_slot| {
478            self.explain_prepared_aggregate_non_paged_terminal(
479                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
480            )
481        })
482    }
483
484    /// Execute and return the sum of distinct `field` values.
485    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
486    where
487        E: EntityValue,
488    {
489        self.ensure_non_paged_mode_ready()?;
490
491        Self::with_slot(field, |target_slot| {
492            self.execute_prepared_numeric_field_terminal(
493                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
494            )
495        })
496    }
497
498    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
499    pub fn explain_sum_distinct_by(
500        &self,
501        field: impl AsRef<str>,
502    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
503    where
504        E: EntityValue,
505    {
506        self.ensure_non_paged_mode_ready()?;
507
508        Self::with_slot(field, |target_slot| {
509            self.explain_prepared_aggregate_non_paged_terminal(
510                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
511            )
512        })
513    }
514
515    /// Execute and return the average of `field` over matching rows.
516    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
517    where
518        E: EntityValue,
519    {
520        self.ensure_non_paged_mode_ready()?;
521
522        Self::with_slot(field, |target_slot| {
523            self.execute_prepared_numeric_field_terminal(
524                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
525            )
526        })
527    }
528
529    /// Explain scalar `avg_by(field)` routing without executing the terminal.
530    pub fn explain_avg_by(
531        &self,
532        field: impl AsRef<str>,
533    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
534    where
535        E: EntityValue,
536    {
537        self.ensure_non_paged_mode_ready()?;
538
539        Self::with_slot(field, |target_slot| {
540            self.explain_prepared_aggregate_non_paged_terminal(
541                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
542            )
543        })
544    }
545
546    /// Execute and return the average of distinct `field` values.
547    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
548    where
549        E: EntityValue,
550    {
551        self.ensure_non_paged_mode_ready()?;
552
553        Self::with_slot(field, |target_slot| {
554            self.execute_prepared_numeric_field_terminal(
555                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
556            )
557        })
558    }
559
560    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
561    pub fn explain_avg_distinct_by(
562        &self,
563        field: impl AsRef<str>,
564    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
565    where
566        E: EntityValue,
567    {
568        self.ensure_non_paged_mode_ready()?;
569
570        Self::with_slot(field, |target_slot| {
571            self.explain_prepared_aggregate_non_paged_terminal(
572                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
573            )
574        })
575    }
576
577    /// Execute and return the median id by `field` using deterministic ordering
578    /// `(field asc, primary key asc)`.
579    ///
580    /// Even-length windows select the lower median.
581    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
582    where
583        E: EntityValue,
584    {
585        self.ensure_non_paged_mode_ready()?;
586
587        Self::with_slot(field, |target_slot| {
588            self.execute_prepared_order_sensitive_terminal_output(
589                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
590            )?
591            .into_id()
592            .map_err(QueryError::execute)
593        })
594    }
595
596    /// Execute and return the number of distinct values for `field` over the
597    /// effective result window.
598    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
599    where
600        E: EntityValue,
601    {
602        self.ensure_non_paged_mode_ready()?;
603
604        Self::with_slot(field, |target_slot| {
605            self.execute_prepared_projection_terminal_output(
606                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
607            )?
608            .into_count()
609            .map_err(QueryError::execute)
610        })
611    }
612
613    /// Explain `count_distinct_by(field)` routing without executing the terminal.
614    pub fn explain_count_distinct_by(
615        &self,
616        field: impl AsRef<str>,
617    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
618    where
619        E: EntityValue,
620    {
621        self.ensure_non_paged_mode_ready()?;
622
623        Self::with_slot(field, |target_slot| {
624            self.explain_prepared_projection_non_paged_terminal(
625                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
626            )
627        })
628    }
629
630    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
631    ///
632    /// Tie handling is deterministic for both extrema: primary key ascending.
633    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
634    where
635        E: EntityValue,
636    {
637        self.ensure_non_paged_mode_ready()?;
638
639        Self::with_slot(field, |target_slot| {
640            self.execute_prepared_order_sensitive_terminal_output(
641                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
642            )?
643            .into_id_pair()
644            .map_err(QueryError::execute)
645        })
646    }
647
648    /// Execute and return projected field values for the effective result window.
649    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
650    where
651        E: EntityValue,
652    {
653        self.ensure_non_paged_mode_ready()?;
654
655        Self::with_slot(field, |target_slot| {
656            self.execute_prepared_projection_terminal_output(
657                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
658            )?
659            .into_values()
660            .map_err(QueryError::execute)
661        })
662    }
663
664    /// Explain `values_by(field)` routing without executing the terminal.
665    pub fn explain_values_by(
666        &self,
667        field: impl AsRef<str>,
668    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
669    where
670        E: EntityValue,
671    {
672        self.ensure_non_paged_mode_ready()?;
673
674        Self::with_slot(field, |target_slot| {
675            self.explain_prepared_projection_non_paged_terminal(
676                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
677            )
678        })
679    }
680
681    /// Execute and return the first `k` rows from the effective response window.
682    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
683    where
684        E: EntityValue,
685    {
686        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
687    }
688
689    /// Execute and return the top `k` rows by `field` under deterministic
690    /// ordering `(field desc, primary_key asc)` over the effective response
691    /// window.
692    ///
693    /// This terminal applies its own ordering and does not preserve query
694    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
695    /// matches `max_by(field)` selection semantics.
696    pub fn top_k_by(
697        &self,
698        field: impl AsRef<str>,
699        take_count: u32,
700    ) -> Result<EntityResponse<E>, QueryError>
701    where
702        E: EntityValue,
703    {
704        self.ensure_non_paged_mode_ready()?;
705
706        Self::with_slot(field, |target_slot| {
707            self.session
708                .execute_load_query_with(self.query(), move |load, plan| {
709                    load.top_k_by_slot(plan, target_slot, take_count)
710                })
711        })
712    }
713
714    /// Execute and return the bottom `k` rows by `field` under deterministic
715    /// ordering `(field asc, primary_key asc)` over the effective response
716    /// window.
717    ///
718    /// This terminal applies its own ordering and does not preserve query
719    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
720    /// matches `min_by(field)` selection semantics.
721    pub fn bottom_k_by(
722        &self,
723        field: impl AsRef<str>,
724        take_count: u32,
725    ) -> Result<EntityResponse<E>, QueryError>
726    where
727        E: EntityValue,
728    {
729        self.ensure_non_paged_mode_ready()?;
730
731        Self::with_slot(field, |target_slot| {
732            self.session
733                .execute_load_query_with(self.query(), move |load, plan| {
734                    load.bottom_k_by_slot(plan, target_slot, take_count)
735                })
736        })
737    }
738
739    /// Execute and return projected values for the top `k` rows by `field`
740    /// under deterministic ordering `(field desc, primary_key asc)` over the
741    /// effective response window.
742    ///
743    /// Ranking is applied before projection and does not preserve query
744    /// `order_by(...)` row order in the returned values. For `k = 1`, this
745    /// matches `max_by(field)` projected to one value.
746    pub fn top_k_by_values(
747        &self,
748        field: impl AsRef<str>,
749        take_count: u32,
750    ) -> Result<Vec<Value>, QueryError>
751    where
752        E: EntityValue,
753    {
754        self.ensure_non_paged_mode_ready()?;
755
756        Self::with_slot(field, |target_slot| {
757            self.session
758                .execute_load_query_with(self.query(), move |load, plan| {
759                    load.top_k_by_values_slot(plan, target_slot, take_count)
760                })
761        })
762    }
763
764    /// Execute and return projected values for the bottom `k` rows by `field`
765    /// under deterministic ordering `(field asc, primary_key asc)` over the
766    /// effective response window.
767    ///
768    /// Ranking is applied before projection and does not preserve query
769    /// `order_by(...)` row order in the returned values. For `k = 1`, this
770    /// matches `min_by(field)` projected to one value.
771    pub fn bottom_k_by_values(
772        &self,
773        field: impl AsRef<str>,
774        take_count: u32,
775    ) -> Result<Vec<Value>, QueryError>
776    where
777        E: EntityValue,
778    {
779        self.ensure_non_paged_mode_ready()?;
780
781        Self::with_slot(field, |target_slot| {
782            self.session
783                .execute_load_query_with(self.query(), move |load, plan| {
784                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
785                })
786        })
787    }
788
789    /// Execute and return projected id/value pairs for the top `k` rows by
790    /// `field` under deterministic ordering `(field desc, primary_key asc)`
791    /// over the effective response window.
792    ///
793    /// Ranking is applied before projection and does not preserve query
794    /// `order_by(...)` row order in the returned values. For `k = 1`, this
795    /// matches `max_by(field)` projected to one `(id, value)` pair.
796    pub fn top_k_by_with_ids(
797        &self,
798        field: impl AsRef<str>,
799        take_count: u32,
800    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
801    where
802        E: EntityValue,
803    {
804        self.ensure_non_paged_mode_ready()?;
805
806        Self::with_slot(field, |target_slot| {
807            self.session
808                .execute_load_query_with(self.query(), move |load, plan| {
809                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
810                })
811        })
812    }
813
814    /// Execute and return projected id/value pairs for the bottom `k` rows by
815    /// `field` under deterministic ordering `(field asc, primary_key asc)`
816    /// over the effective response window.
817    ///
818    /// Ranking is applied before projection and does not preserve query
819    /// `order_by(...)` row order in the returned values. For `k = 1`, this
820    /// matches `min_by(field)` projected to one `(id, value)` pair.
821    pub fn bottom_k_by_with_ids(
822        &self,
823        field: impl AsRef<str>,
824        take_count: u32,
825    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
826    where
827        E: EntityValue,
828    {
829        self.ensure_non_paged_mode_ready()?;
830
831        Self::with_slot(field, |target_slot| {
832            self.session
833                .execute_load_query_with(self.query(), move |load, plan| {
834                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
835                })
836        })
837    }
838
839    /// Execute and return distinct projected field values for the effective
840    /// result window, preserving first-observed value order.
841    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
842    where
843        E: EntityValue,
844    {
845        self.ensure_non_paged_mode_ready()?;
846
847        Self::with_slot(field, |target_slot| {
848            self.execute_prepared_projection_terminal_output(
849                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
850            )?
851            .into_values()
852            .map_err(QueryError::execute)
853        })
854    }
855
856    /// Explain `distinct_values_by(field)` routing without executing the terminal.
857    pub fn explain_distinct_values_by(
858        &self,
859        field: impl AsRef<str>,
860    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
861    where
862        E: EntityValue,
863    {
864        self.ensure_non_paged_mode_ready()?;
865
866        Self::with_slot(field, |target_slot| {
867            self.explain_prepared_projection_non_paged_terminal(
868                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
869            )
870        })
871    }
872
873    /// Execute and return projected field values paired with row ids for the
874    /// effective result window.
875    pub fn values_by_with_ids(
876        &self,
877        field: impl AsRef<str>,
878    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
879    where
880        E: EntityValue,
881    {
882        self.ensure_non_paged_mode_ready()?;
883
884        Self::with_slot(field, |target_slot| {
885            self.execute_prepared_projection_terminal_output(
886                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
887            )?
888            .into_values_with_ids()
889            .map_err(QueryError::execute)
890        })
891    }
892
893    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
894    pub fn explain_values_by_with_ids(
895        &self,
896        field: impl AsRef<str>,
897    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
898    where
899        E: EntityValue,
900    {
901        self.ensure_non_paged_mode_ready()?;
902
903        Self::with_slot(field, |target_slot| {
904            self.explain_prepared_projection_non_paged_terminal(
905                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
906            )
907        })
908    }
909
910    /// Execute and return the first projected field value in effective response
911    /// order, if any.
912    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
913    where
914        E: EntityValue,
915    {
916        self.ensure_non_paged_mode_ready()?;
917
918        Self::with_slot(field, |target_slot| {
919            self.execute_prepared_projection_terminal_output(
920                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
921            )?
922            .into_terminal_value()
923            .map_err(QueryError::execute)
924        })
925    }
926
927    /// Explain `first_value_by(field)` routing without executing the terminal.
928    pub fn explain_first_value_by(
929        &self,
930        field: impl AsRef<str>,
931    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
932    where
933        E: EntityValue,
934    {
935        self.ensure_non_paged_mode_ready()?;
936
937        Self::with_slot(field, |target_slot| {
938            self.explain_prepared_projection_non_paged_terminal(
939                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
940            )
941        })
942    }
943
944    /// Execute and return the last projected field value in effective response
945    /// order, if any.
946    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
947    where
948        E: EntityValue,
949    {
950        self.ensure_non_paged_mode_ready()?;
951
952        Self::with_slot(field, |target_slot| {
953            self.execute_prepared_projection_terminal_output(
954                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
955            )?
956            .into_terminal_value()
957            .map_err(QueryError::execute)
958        })
959    }
960
961    /// Explain `last_value_by(field)` routing without executing the terminal.
962    pub fn explain_last_value_by(
963        &self,
964        field: impl AsRef<str>,
965    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
966    where
967        E: EntityValue,
968    {
969        self.ensure_non_paged_mode_ready()?;
970
971        Self::with_slot(field, |target_slot| {
972            self.explain_prepared_projection_non_paged_terminal(
973                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
974            )
975        })
976    }
977
978    /// Execute and return the first matching identifier in response order, if any.
979    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
980    where
981        E: EntityValue,
982    {
983        self.execute_prepared_order_sensitive_terminal_output(
984            PreparedFluentOrderSensitiveTerminalStrategy::first(),
985        )?
986        .into_id()
987        .map_err(QueryError::execute)
988    }
989
990    /// Explain scalar `first()` routing without executing the terminal.
991    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
992    where
993        E: EntityValue,
994    {
995        self.explain_prepared_aggregate_non_paged_terminal(
996            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
997        )
998    }
999
1000    /// Execute and return the last matching identifier in response order, if any.
1001    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1002    where
1003        E: EntityValue,
1004    {
1005        self.execute_prepared_order_sensitive_terminal_output(
1006            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1007        )?
1008        .into_id()
1009        .map_err(QueryError::execute)
1010    }
1011
1012    /// Explain scalar `last()` routing without executing the terminal.
1013    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1014    where
1015        E: EntityValue,
1016    {
1017        self.explain_prepared_aggregate_non_paged_terminal(
1018            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1019        )
1020    }
1021
1022    /// Execute and require exactly one matching row.
1023    pub fn require_one(&self) -> Result<(), QueryError>
1024    where
1025        E: EntityValue,
1026    {
1027        self.execute()?.into_rows()?.require_one()?;
1028        Ok(())
1029    }
1030
1031    /// Execute and require at least one matching row.
1032    pub fn require_some(&self) -> Result<(), QueryError>
1033    where
1034        E: EntityValue,
1035    {
1036        self.execute()?.into_rows()?.require_some()?;
1037        Ok(())
1038    }
1039}
1040
1041fn scalar_terminal_boundary_request_from_prepared(
1042    request: PreparedFluentScalarTerminalRuntimeRequest,
1043) -> ScalarTerminalBoundaryRequest {
1044    match request {
1045        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1046            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1047        }
1048        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1049            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1050        }
1051    }
1052}
1053
1054const fn existing_rows_terminal_boundary_request_from_prepared(
1055    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1056) -> ScalarTerminalBoundaryRequest {
1057    match request {
1058        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1059            ScalarTerminalBoundaryRequest::Count
1060        }
1061        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1062            ScalarTerminalBoundaryRequest::Exists
1063        }
1064    }
1065}
1066
1067const fn numeric_field_boundary_request_from_prepared(
1068    request: PreparedFluentNumericFieldRuntimeRequest,
1069) -> ScalarNumericFieldBoundaryRequest {
1070    match request {
1071        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1072        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1073            ScalarNumericFieldBoundaryRequest::SumDistinct
1074        }
1075        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1076        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1077            ScalarNumericFieldBoundaryRequest::AvgDistinct
1078        }
1079    }
1080}
1081
1082fn order_sensitive_terminal_boundary_request_from_prepared(
1083    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1084) -> ScalarTerminalBoundaryRequest {
1085    match request {
1086        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1087            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1088        }
1089        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1090            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1091        }
1092        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1093            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1094        }
1095        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1096            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1097        }
1098    }
1099}
1100
1101const fn projection_boundary_request_from_prepared(
1102    request: PreparedFluentProjectionRuntimeRequest,
1103) -> ScalarProjectionBoundaryRequest {
1104    match request {
1105        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1106        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1107            ScalarProjectionBoundaryRequest::DistinctValues
1108        }
1109        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1110            ScalarProjectionBoundaryRequest::CountDistinct
1111        }
1112        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1113            ScalarProjectionBoundaryRequest::ValuesWithIds
1114        }
1115        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1116            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1117        }
1118    }
1119}