Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5//!
6//! Terminal Execution Model
7//!
8//! Fluent terminals are concrete descriptors with a 1:1 mapping to session
9//! execution and explain entrypoints. This module may orchestrate descriptor
10//! construction, non-paged gating, and public output shaping, but it must not
11//! carry terminal-kind enums, transport output enums, or match-based execution
12//! dispatch. Adding a new terminal means adding a new descriptor type and one
13//! direct `TerminalStrategyDriver` implementation for that descriptor.
14
15#[cfg(feature = "diagnostics")]
16use crate::db::FluentTerminalExecutionAttribution;
17use crate::{
18    db::{
19        DbSession, PersistedRow, Query,
20        query::{
21            api::ResponseCardinalityExt,
22            builder::{
23                AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
24                CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
25                FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
26                MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
27                MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
28                SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
29                ValuesBySlotWithIdsTerminal,
30            },
31            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
32            fluent::load::{FluentLoadQuery, LoadQueryResult},
33            intent::QueryError,
34            plan::AggregateKind,
35        },
36        response::EntityResponse,
37    },
38    traits::EntityValue,
39    types::{Decimal, Id},
40    value::{OutputValue, Value},
41};
42
43type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
44
45///
46/// TerminalStrategyDriver
47///
48/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
49/// query-owned strategy object and the session-owned execution/explain
50/// boundary. Implementations are deliberately thin: they only choose the
51/// matching `DbSession` method for an existing strategy type.
52///
53
54trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
55    type Output;
56    type ExplainOutput;
57
58    fn execute(
59        self,
60        session: &DbSession<E::Canister>,
61        query: &Query<E>,
62    ) -> Result<Self::Output, QueryError>;
63
64    fn explain(
65        &self,
66        session: &DbSession<E::Canister>,
67        query: &Query<E>,
68    ) -> Result<Self::ExplainOutput, QueryError>;
69}
70
71// Define one aggregate-style terminal driver implementation. The macro keeps
72// terminal descriptors 1:1 with session methods while removing repeated explain
73// wiring that is identical for every aggregate terminal.
74macro_rules! impl_aggregate_terminal_driver {
75    ($terminal:ty, $output:ty, $execute:ident) => {
76        impl<E> TerminalStrategyDriver<E> for $terminal
77        where
78            E: PersistedRow + EntityValue,
79        {
80            type Output = $output;
81            type ExplainOutput = ExplainAggregateTerminalPlan;
82
83            fn execute(
84                self,
85                session: &DbSession<E::Canister>,
86                query: &Query<E>,
87            ) -> Result<Self::Output, QueryError> {
88                session.$execute(query, self)
89            }
90
91            fn explain(
92                &self,
93                session: &DbSession<E::Canister>,
94                query: &Query<E>,
95            ) -> Result<Self::ExplainOutput, QueryError> {
96                session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
97            }
98        }
99    };
100}
101
102// Define one projection-style terminal driver implementation. Projection
103// terminals share the execution/explain shape but return execution descriptors
104// rather than aggregate-terminal plans.
105macro_rules! impl_projection_terminal_driver {
106    ($terminal:ty, $output:ty, $execute:ident) => {
107        impl<E> TerminalStrategyDriver<E> for $terminal
108        where
109            E: PersistedRow + EntityValue,
110        {
111            type Output = $output;
112            type ExplainOutput = ExplainExecutionNodeDescriptor;
113
114            fn execute(
115                self,
116                session: &DbSession<E::Canister>,
117                query: &Query<E>,
118            ) -> Result<Self::Output, QueryError> {
119                session.$execute(query, self)
120            }
121
122            fn explain(
123                &self,
124                session: &DbSession<E::Canister>,
125                query: &Query<E>,
126            ) -> Result<Self::ExplainOutput, QueryError> {
127                session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
128            }
129        }
130    };
131}
132
133impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
134impl_aggregate_terminal_driver!(
135    ExistsRowsTerminal,
136    bool,
137    execute_fluent_exists_rows_terminal
138);
139impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
140impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
141impl_aggregate_terminal_driver!(
142    MinIdBySlotTerminal,
143    Option<Id<E>>,
144    execute_fluent_min_id_by_slot
145);
146impl_aggregate_terminal_driver!(
147    MaxIdBySlotTerminal,
148    Option<Id<E>>,
149    execute_fluent_max_id_by_slot
150);
151impl_aggregate_terminal_driver!(
152    SumBySlotTerminal,
153    Option<Decimal>,
154    execute_fluent_sum_by_slot
155);
156impl_aggregate_terminal_driver!(
157    SumDistinctBySlotTerminal,
158    Option<Decimal>,
159    execute_fluent_sum_distinct_by_slot
160);
161impl_aggregate_terminal_driver!(
162    AvgBySlotTerminal,
163    Option<Decimal>,
164    execute_fluent_avg_by_slot
165);
166impl_aggregate_terminal_driver!(
167    AvgDistinctBySlotTerminal,
168    Option<Decimal>,
169    execute_fluent_avg_distinct_by_slot
170);
171impl_aggregate_terminal_driver!(
172    FirstIdTerminal,
173    Option<Id<E>>,
174    execute_fluent_first_id_terminal
175);
176impl_aggregate_terminal_driver!(
177    LastIdTerminal,
178    Option<Id<E>>,
179    execute_fluent_last_id_terminal
180);
181impl_aggregate_terminal_driver!(
182    NthIdBySlotTerminal,
183    Option<Id<E>>,
184    execute_fluent_nth_id_by_slot
185);
186impl_aggregate_terminal_driver!(
187    MedianIdBySlotTerminal,
188    Option<Id<E>>,
189    execute_fluent_median_id_by_slot
190);
191impl_aggregate_terminal_driver!(
192    MinMaxIdBySlotTerminal,
193    MinMaxByIds<E>,
194    execute_fluent_min_max_id_by_slot
195);
196
197impl_projection_terminal_driver!(
198    ValuesBySlotTerminal,
199    Vec<Value>,
200    execute_fluent_values_by_slot
201);
202impl_projection_terminal_driver!(
203    DistinctValuesBySlotTerminal,
204    Vec<Value>,
205    execute_fluent_distinct_values_by_slot
206);
207impl_projection_terminal_driver!(
208    CountDistinctBySlotTerminal,
209    u32,
210    execute_fluent_count_distinct_by_slot
211);
212impl_projection_terminal_driver!(
213    ValuesBySlotWithIdsTerminal,
214    Vec<(Id<E>, Value)>,
215    execute_fluent_values_by_with_ids_slot
216);
217impl_projection_terminal_driver!(
218    FirstValueBySlotTerminal,
219    Option<Value>,
220    execute_fluent_first_value_by_slot
221);
222impl_projection_terminal_driver!(
223    LastValueBySlotTerminal,
224    Option<Value>,
225    execute_fluent_last_value_by_slot
226);
227
228// Convert one runtime projection value into the public output boundary type.
229fn output(value: Value) -> OutputValue {
230    OutputValue::from(value)
231}
232
233// Convert one ordered runtime projection vector into the public output form.
234fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
235    values.into_iter().map(output).collect()
236}
237
238// Convert one ordered runtime `(id, value)` projection vector into the public output form.
239fn output_values_with_ids<E: PersistedRow>(
240    values: Vec<(Id<E>, Value)>,
241) -> Vec<(Id<E>, OutputValue)> {
242    values
243        .into_iter()
244        .map(|(id, value)| (id, output(value)))
245        .collect()
246}
247
248impl<E> FluentLoadQuery<'_, E>
249where
250    E: PersistedRow,
251{
252    // ------------------------------------------------------------------
253    // Execution (single semantic boundary)
254    // ------------------------------------------------------------------
255
256    /// Execute this query using the session's policy settings.
257    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
258    where
259        E: EntityValue,
260    {
261        self.with_admitted_non_paged(DbSession::execute_query_result)
262    }
263
264    /// Execute this query without the default bounded read-admission gate.
265    ///
266    /// This is for trusted maintenance/admin code that has its own caller
267    /// authorization and resource policy. Application-facing reads should use
268    /// `execute`.
269    pub fn execute_trusted(&self) -> Result<LoadQueryResult<E>, QueryError>
270    where
271        E: EntityValue,
272    {
273        self.with_non_paged(DbSession::execute_query_result)
274    }
275
276    /// Execute this query through the scalar rows-only session boundary.
277    pub fn execute_rows(&self) -> Result<EntityResponse<E>, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.with_admitted_non_paged(DbSession::execute_scalar_query_rows)
282    }
283
284    /// Execute scalar rows without the default bounded read-admission gate.
285    ///
286    /// This is for trusted maintenance/admin code that has its own caller
287    /// authorization and resource policy. Application-facing reads should use
288    /// `execute_rows`.
289    pub fn execute_rows_trusted(&self) -> Result<EntityResponse<E>, QueryError>
290    where
291        E: EntityValue,
292    {
293        self.with_non_paged(DbSession::execute_scalar_query_rows)
294    }
295
296    fn with_admitted_non_paged<T>(
297        &self,
298        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
299    ) -> Result<T, QueryError>
300    where
301        E: EntityValue,
302    {
303        self.ensure_non_paged_mode_ready()?;
304        self.ensure_default_read_admission()?;
305        map(self.session, self.query())
306    }
307
308    // Run one terminal operation through the canonical non-paged fluent policy
309    // gate so execution and explain helpers cannot drift on readiness checks.
310    fn with_non_paged<T>(
311        &self,
312        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
313    ) -> Result<T, QueryError>
314    where
315        E: EntityValue,
316    {
317        self.ensure_non_paged_mode_ready()?;
318        map(self.session, self.query())
319    }
320
321    // Resolve the structural execution descriptor for this fluent load query
322    // through the session-owned visible-index explain path once.
323    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
324    where
325        E: EntityValue,
326    {
327        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
328    }
329
330    // Render one descriptor-derived execution surface so text/json explain
331    // terminals do not each forward the same session explain call ad hoc.
332    fn render_execution_descriptor(
333        &self,
334        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
335    ) -> Result<String, QueryError>
336    where
337        E: EntityValue,
338    {
339        let descriptor = self.explain_execution_descriptor()?;
340
341        Ok(render(descriptor))
342    }
343
344    // Execute one prepared terminal descriptor through the canonical
345    // non-paged fluent policy gate.
346    fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
347    where
348        E: EntityValue,
349        S: TerminalStrategyDriver<E>,
350    {
351        self.with_admitted_non_paged(|session, query| strategy.execute(session, query))
352    }
353
354    // Explain one prepared terminal strategy through the same non-paged fluent
355    // policy gate used by execution.
356    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
357    where
358        E: EntityValue,
359        S: TerminalStrategyDriver<E>,
360    {
361        self.with_non_paged(|session, query| strategy.explain(session, query))
362    }
363
364    // ------------------------------------------------------------------
365    // Execution terminals — semantic only
366    // ------------------------------------------------------------------
367
368    /// Execute and return whether the result set is empty.
369    pub fn is_empty(&self) -> Result<bool, QueryError>
370    where
371        E: EntityValue,
372    {
373        self.not_exists()
374    }
375
376    /// Execute and return whether no matching row exists.
377    pub fn not_exists(&self) -> Result<bool, QueryError>
378    where
379        E: EntityValue,
380    {
381        Ok(!self.exists()?)
382    }
383
384    /// Execute and return whether at least one matching row exists.
385    pub fn exists(&self) -> Result<bool, QueryError>
386    where
387        E: EntityValue,
388    {
389        self.execute_terminal(ExistsRowsTerminal::new())
390    }
391
392    /// Execute and return whether at least one matching row exists with
393    /// terminal attribution.
394    #[cfg(feature = "diagnostics")]
395    #[doc(hidden)]
396    pub fn exists_with_attribution(
397        &self,
398    ) -> Result<(bool, FluentTerminalExecutionAttribution), QueryError>
399    where
400        E: EntityValue,
401    {
402        self.with_admitted_non_paged(|session, query| {
403            session.execute_fluent_exists_rows_terminal_with_attribution(
404                query,
405                ExistsRowsTerminal::new(),
406            )
407        })
408    }
409
410    /// Explain scalar `exists()` routing without executing the terminal.
411    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
412    where
413        E: EntityValue,
414    {
415        self.explain_terminal(&ExistsRowsTerminal::new())
416    }
417
418    /// Explain scalar `not_exists()` routing without executing the terminal.
419    ///
420    /// This remains an `exists()` execution plan with negated boolean semantics.
421    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
422    where
423        E: EntityValue,
424    {
425        self.explain_exists()
426    }
427
428    /// Explain scalar load execution shape without executing the query.
429    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
430    where
431        E: EntityValue,
432    {
433        self.explain_execution_descriptor()
434    }
435
436    /// Explain scalar load execution shape as deterministic text.
437    pub fn explain_execution_text(&self) -> Result<String, QueryError>
438    where
439        E: EntityValue,
440    {
441        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
442    }
443
444    /// Explain scalar load execution shape as canonical JSON.
445    pub fn explain_execution_json(&self) -> Result<String, QueryError>
446    where
447        E: EntityValue,
448    {
449        self.with_non_paged(DbSession::explain_query_execution_json_with_visible_indexes)
450    }
451
452    /// Explain scalar load execution shape as verbose text with diagnostics.
453    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
454    where
455        E: EntityValue,
456    {
457        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
458    }
459
460    /// Execute and return the number of matching rows.
461    pub fn count(&self) -> Result<u32, QueryError>
462    where
463        E: EntityValue,
464    {
465        self.execute_terminal(CountRowsTerminal::new())
466    }
467
468    /// Execute and return the number of matching rows with terminal attribution.
469    #[cfg(feature = "diagnostics")]
470    #[doc(hidden)]
471    pub fn count_with_attribution(
472        &self,
473    ) -> Result<(u32, FluentTerminalExecutionAttribution), QueryError>
474    where
475        E: EntityValue,
476    {
477        self.with_admitted_non_paged(|session, query| {
478            session.execute_fluent_count_rows_terminal_with_attribution(
479                query,
480                CountRowsTerminal::new(),
481            )
482        })
483    }
484
485    /// Execute and return the total persisted payload bytes for the effective
486    /// result window.
487    pub fn bytes(&self) -> Result<u64, QueryError>
488    where
489        E: EntityValue,
490    {
491        self.with_admitted_non_paged(DbSession::execute_fluent_bytes)
492    }
493
494    /// Execute and return the total serialized bytes for `field` over the
495    /// effective result window.
496    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
497    where
498        E: EntityValue,
499    {
500        let target_slot = self.resolve_non_paged_slot(field)?;
501
502        self.with_admitted_non_paged(|session, query| {
503            session.execute_fluent_bytes_by_slot(query, target_slot)
504        })
505    }
506
507    /// Explain `bytes_by(field)` routing without executing the terminal.
508    pub fn explain_bytes_by(
509        &self,
510        field: impl AsRef<str>,
511    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
512    where
513        E: EntityValue,
514    {
515        let target_slot = self.resolve_non_paged_slot(field)?;
516
517        self.with_non_paged(|session, query| {
518            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
519        })
520    }
521
522    /// Execute and return the smallest matching identifier, if any.
523    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
524    where
525        E: EntityValue,
526    {
527        self.execute_terminal(MinIdTerminal::new())
528    }
529
530    /// Explain scalar `min()` routing without executing the terminal.
531    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
532    where
533        E: EntityValue,
534    {
535        self.explain_terminal(&MinIdTerminal::new())
536    }
537
538    /// Execute and return the id of the row with the smallest value for `field`.
539    ///
540    /// Ties are deterministic: equal field values resolve by primary key ascending.
541    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
542    where
543        E: EntityValue,
544    {
545        let target_slot = self.resolve_non_paged_slot(field)?;
546
547        self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
548    }
549
550    /// Execute and return the largest matching identifier, if any.
551    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
552    where
553        E: EntityValue,
554    {
555        self.execute_terminal(MaxIdTerminal::new())
556    }
557
558    /// Explain scalar `max()` routing without executing the terminal.
559    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
560    where
561        E: EntityValue,
562    {
563        self.explain_terminal(&MaxIdTerminal::new())
564    }
565
566    /// Execute and return the id of the row with the largest value for `field`.
567    ///
568    /// Ties are deterministic: equal field values resolve by primary key ascending.
569    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
570    where
571        E: EntityValue,
572    {
573        let target_slot = self.resolve_non_paged_slot(field)?;
574
575        self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
576    }
577
578    /// Execute and return the id at zero-based ordinal `nth` when rows are
579    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
580    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
581    where
582        E: EntityValue,
583    {
584        let target_slot = self.resolve_non_paged_slot(field)?;
585
586        self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
587    }
588
589    /// Execute and return the sum of `field` over matching rows.
590    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
591    where
592        E: EntityValue,
593    {
594        let target_slot = self.resolve_non_paged_slot(field)?;
595
596        self.execute_terminal(SumBySlotTerminal::new(target_slot))
597    }
598
599    /// Explain scalar `sum_by(field)` routing without executing the terminal.
600    pub fn explain_sum_by(
601        &self,
602        field: impl AsRef<str>,
603    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
604    where
605        E: EntityValue,
606    {
607        let target_slot = self.resolve_non_paged_slot(field)?;
608
609        self.explain_terminal(&SumBySlotTerminal::new(target_slot))
610    }
611
612    /// Execute and return the sum of distinct `field` values.
613    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
614    where
615        E: EntityValue,
616    {
617        let target_slot = self.resolve_non_paged_slot(field)?;
618
619        self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
620    }
621
622    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
623    pub fn explain_sum_distinct_by(
624        &self,
625        field: impl AsRef<str>,
626    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
627    where
628        E: EntityValue,
629    {
630        let target_slot = self.resolve_non_paged_slot(field)?;
631
632        self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
633    }
634
635    /// Execute and return the average of `field` over matching rows.
636    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
637    where
638        E: EntityValue,
639    {
640        let target_slot = self.resolve_non_paged_slot(field)?;
641
642        self.execute_terminal(AvgBySlotTerminal::new(target_slot))
643    }
644
645    /// Explain scalar `avg_by(field)` routing without executing the terminal.
646    pub fn explain_avg_by(
647        &self,
648        field: impl AsRef<str>,
649    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
650    where
651        E: EntityValue,
652    {
653        let target_slot = self.resolve_non_paged_slot(field)?;
654
655        self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
656    }
657
658    /// Execute and return the average of distinct `field` values.
659    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
660    where
661        E: EntityValue,
662    {
663        let target_slot = self.resolve_non_paged_slot(field)?;
664
665        self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
666    }
667
668    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
669    pub fn explain_avg_distinct_by(
670        &self,
671        field: impl AsRef<str>,
672    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
673    where
674        E: EntityValue,
675    {
676        let target_slot = self.resolve_non_paged_slot(field)?;
677
678        self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
679    }
680
681    /// Execute and return the median id by `field` using deterministic ordering
682    /// `(field asc, primary key asc)`.
683    ///
684    /// Even-length windows select the lower median.
685    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
686    where
687        E: EntityValue,
688    {
689        let target_slot = self.resolve_non_paged_slot(field)?;
690
691        self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
692    }
693
694    /// Execute and return the number of distinct values for `field` over the
695    /// effective result window.
696    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
697    where
698        E: EntityValue,
699    {
700        let target_slot = self.resolve_non_paged_slot(field)?;
701
702        self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
703    }
704
705    /// Explain `count_distinct_by(field)` routing without executing the terminal.
706    pub fn explain_count_distinct_by(
707        &self,
708        field: impl AsRef<str>,
709    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
710    where
711        E: EntityValue,
712    {
713        let target_slot = self.resolve_non_paged_slot(field)?;
714
715        self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
716    }
717
718    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
719    ///
720    /// Tie handling is deterministic for both extrema: primary key ascending.
721    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
722    where
723        E: EntityValue,
724    {
725        let target_slot = self.resolve_non_paged_slot(field)?;
726
727        self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
728    }
729
730    /// Execute and return projected field values for the effective result window.
731    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
732    where
733        E: EntityValue,
734    {
735        let target_slot = self.resolve_non_paged_slot(field)?;
736
737        self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
738            .map(output_values)
739    }
740
741    /// Execute and return projected values for one shared bounded projection
742    /// over the effective response window.
743    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
744    where
745        E: EntityValue,
746        P: ValueProjectionExpr,
747    {
748        let target_slot = self.resolve_non_paged_slot(projection.field())?;
749
750        self.with_admitted_non_paged(|session, query| {
751            session.execute_fluent_project_values_by_slot(
752                query,
753                target_slot,
754                projection.projection_plan().into_expr(),
755            )
756        })
757        .map(output_values)
758    }
759
760    /// Explain `project_values(projection)` routing without executing it.
761    pub fn explain_project_values<P>(
762        &self,
763        projection: &P,
764    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
765    where
766        E: EntityValue,
767        P: ValueProjectionExpr,
768    {
769        let target_slot = self.resolve_non_paged_slot(projection.field())?;
770
771        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
772    }
773
774    /// Explain `values_by(field)` routing without executing the terminal.
775    pub fn explain_values_by(
776        &self,
777        field: impl AsRef<str>,
778    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
779    where
780        E: EntityValue,
781    {
782        let target_slot = self.resolve_non_paged_slot(field)?;
783
784        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
785    }
786
787    /// Execute and return the first `k` rows from the effective response window.
788    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
789    where
790        E: EntityValue,
791    {
792        self.with_admitted_non_paged(|session, query| {
793            session.execute_fluent_take(query, take_count)
794        })
795    }
796
797    /// Execute and return the top `k` rows by `field` under deterministic
798    /// ordering `(field desc, primary_key asc)` over the effective response
799    /// window.
800    ///
801    /// This terminal applies its own ordering and does not preserve query
802    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
803    /// matches `max_by(field)` selection semantics.
804    pub fn top_k_by(
805        &self,
806        field: impl AsRef<str>,
807        take_count: u32,
808    ) -> Result<EntityResponse<E>, QueryError>
809    where
810        E: EntityValue,
811    {
812        let target_slot = self.resolve_non_paged_slot(field)?;
813
814        self.with_admitted_non_paged(|session, query| {
815            session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
816        })
817    }
818
819    /// Execute and return the bottom `k` rows by `field` under deterministic
820    /// ordering `(field asc, primary_key asc)` over the effective response
821    /// window.
822    ///
823    /// This terminal applies its own ordering and does not preserve query
824    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
825    /// matches `min_by(field)` selection semantics.
826    pub fn bottom_k_by(
827        &self,
828        field: impl AsRef<str>,
829        take_count: u32,
830    ) -> Result<EntityResponse<E>, QueryError>
831    where
832        E: EntityValue,
833    {
834        let target_slot = self.resolve_non_paged_slot(field)?;
835
836        self.with_admitted_non_paged(|session, query| {
837            session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
838        })
839    }
840
841    /// Execute and return projected values for the top `k` rows by `field`
842    /// under deterministic ordering `(field desc, primary_key asc)` over the
843    /// effective response window.
844    ///
845    /// Ranking is applied before projection and does not preserve query
846    /// `order_term(...)` row order in the returned values. For `k = 1`, this
847    /// matches `max_by(field)` projected to one value.
848    pub fn top_k_by_values(
849        &self,
850        field: impl AsRef<str>,
851        take_count: u32,
852    ) -> Result<Vec<OutputValue>, QueryError>
853    where
854        E: EntityValue,
855    {
856        let target_slot = self.resolve_non_paged_slot(field)?;
857
858        self.with_admitted_non_paged(|session, query| {
859            session
860                .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
861                .map(output_values)
862        })
863    }
864
865    /// Execute and return projected values for the bottom `k` rows by `field`
866    /// under deterministic ordering `(field asc, primary_key asc)` over the
867    /// effective response window.
868    ///
869    /// Ranking is applied before projection and does not preserve query
870    /// `order_term(...)` row order in the returned values. For `k = 1`, this
871    /// matches `min_by(field)` projected to one value.
872    pub fn bottom_k_by_values(
873        &self,
874        field: impl AsRef<str>,
875        take_count: u32,
876    ) -> Result<Vec<OutputValue>, QueryError>
877    where
878        E: EntityValue,
879    {
880        let target_slot = self.resolve_non_paged_slot(field)?;
881
882        self.with_admitted_non_paged(|session, query| {
883            session
884                .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
885                .map(output_values)
886        })
887    }
888
889    /// Execute and return projected id/value pairs for the top `k` rows by
890    /// `field` under deterministic ordering `(field desc, primary_key asc)`
891    /// over the effective response window.
892    ///
893    /// Ranking is applied before projection and does not preserve query
894    /// `order_term(...)` row order in the returned values. For `k = 1`, this
895    /// matches `max_by(field)` projected to one `(id, value)` pair.
896    pub fn top_k_by_with_ids(
897        &self,
898        field: impl AsRef<str>,
899        take_count: u32,
900    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
901    where
902        E: EntityValue,
903    {
904        let target_slot = self.resolve_non_paged_slot(field)?;
905
906        self.with_admitted_non_paged(|session, query| {
907            session
908                .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
909                .map(output_values_with_ids)
910        })
911    }
912
913    /// Execute and return projected id/value pairs for the bottom `k` rows by
914    /// `field` under deterministic ordering `(field asc, primary_key asc)`
915    /// over the effective response window.
916    ///
917    /// Ranking is applied before projection and does not preserve query
918    /// `order_term(...)` row order in the returned values. For `k = 1`, this
919    /// matches `min_by(field)` projected to one `(id, value)` pair.
920    pub fn bottom_k_by_with_ids(
921        &self,
922        field: impl AsRef<str>,
923        take_count: u32,
924    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
925    where
926        E: EntityValue,
927    {
928        let target_slot = self.resolve_non_paged_slot(field)?;
929
930        self.with_admitted_non_paged(|session, query| {
931            session
932                .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
933                .map(output_values_with_ids)
934        })
935    }
936
937    /// Execute and return distinct projected field values for the effective
938    /// result window, preserving first-observed value order.
939    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
940    where
941        E: EntityValue,
942    {
943        let target_slot = self.resolve_non_paged_slot(field)?;
944
945        self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
946            .map(output_values)
947    }
948
949    /// Explain `distinct_values_by(field)` routing without executing the terminal.
950    pub fn explain_distinct_values_by(
951        &self,
952        field: impl AsRef<str>,
953    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
954    where
955        E: EntityValue,
956    {
957        let target_slot = self.resolve_non_paged_slot(field)?;
958
959        self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
960    }
961
962    /// Execute and return projected field values paired with row ids for the
963    /// effective result window.
964    pub fn values_by_with_ids(
965        &self,
966        field: impl AsRef<str>,
967    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
968    where
969        E: EntityValue,
970    {
971        let target_slot = self.resolve_non_paged_slot(field)?;
972
973        self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
974            .map(output_values_with_ids)
975    }
976
977    /// Execute and return projected id/value pairs for one shared bounded
978    /// projection over the effective response window.
979    pub fn project_values_with_ids<P>(
980        &self,
981        projection: &P,
982    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
983    where
984        E: EntityValue,
985        P: ValueProjectionExpr,
986    {
987        let target_slot = self.resolve_non_paged_slot(projection.field())?;
988
989        self.with_admitted_non_paged(|session, query| {
990            session.execute_fluent_project_values_with_ids_by_slot(
991                query,
992                target_slot,
993                projection.projection_plan().into_expr(),
994            )
995        })
996        .map(output_values_with_ids)
997    }
998
999    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
1000    pub fn explain_values_by_with_ids(
1001        &self,
1002        field: impl AsRef<str>,
1003    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1004    where
1005        E: EntityValue,
1006    {
1007        let target_slot = self.resolve_non_paged_slot(field)?;
1008
1009        self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
1010    }
1011
1012    /// Execute and return the first projected field value in effective response
1013    /// order, if any.
1014    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1015    where
1016        E: EntityValue,
1017    {
1018        let target_slot = self.resolve_non_paged_slot(field)?;
1019
1020        self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
1021            .map(|value| value.map(output))
1022    }
1023
1024    /// Execute and return the first projected value for one shared bounded
1025    /// projection in effective response order, if any.
1026    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1027    where
1028        E: EntityValue,
1029        P: ValueProjectionExpr,
1030    {
1031        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1032
1033        self.with_admitted_non_paged(|session, query| {
1034            session.execute_fluent_project_terminal_value_by_slot(
1035                query,
1036                target_slot,
1037                AggregateKind::First,
1038                projection.projection_plan().into_expr(),
1039            )
1040        })
1041        .map(|value| value.map(output))
1042    }
1043
1044    /// Explain `first_value_by(field)` routing without executing the terminal.
1045    pub fn explain_first_value_by(
1046        &self,
1047        field: impl AsRef<str>,
1048    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1049    where
1050        E: EntityValue,
1051    {
1052        let target_slot = self.resolve_non_paged_slot(field)?;
1053
1054        self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
1055    }
1056
1057    /// Execute and return the last projected field value in effective response
1058    /// order, if any.
1059    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1060    where
1061        E: EntityValue,
1062    {
1063        let target_slot = self.resolve_non_paged_slot(field)?;
1064
1065        self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
1066            .map(|value| value.map(output))
1067    }
1068
1069    /// Execute and return the last projected value for one shared bounded
1070    /// projection in effective response order, if any.
1071    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1072    where
1073        E: EntityValue,
1074        P: ValueProjectionExpr,
1075    {
1076        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1077
1078        self.with_admitted_non_paged(|session, query| {
1079            session.execute_fluent_project_terminal_value_by_slot(
1080                query,
1081                target_slot,
1082                AggregateKind::Last,
1083                projection.projection_plan().into_expr(),
1084            )
1085        })
1086        .map(|value| value.map(output))
1087    }
1088
1089    /// Explain `last_value_by(field)` routing without executing the terminal.
1090    pub fn explain_last_value_by(
1091        &self,
1092        field: impl AsRef<str>,
1093    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1094    where
1095        E: EntityValue,
1096    {
1097        let target_slot = self.resolve_non_paged_slot(field)?;
1098
1099        self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1100    }
1101
1102    /// Execute and return the first matching identifier in response order, if any.
1103    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1104    where
1105        E: EntityValue,
1106    {
1107        self.execute_terminal(FirstIdTerminal::new())
1108    }
1109
1110    /// Explain scalar `first()` routing without executing the terminal.
1111    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1112    where
1113        E: EntityValue,
1114    {
1115        self.explain_terminal(&FirstIdTerminal::new())
1116    }
1117
1118    /// Execute and return the last matching identifier in response order, if any.
1119    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1120    where
1121        E: EntityValue,
1122    {
1123        self.execute_terminal(LastIdTerminal::new())
1124    }
1125
1126    /// Explain scalar `last()` routing without executing the terminal.
1127    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1128    where
1129        E: EntityValue,
1130    {
1131        self.explain_terminal(&LastIdTerminal::new())
1132    }
1133
1134    /// Execute and require exactly one matching row.
1135    pub fn require_one(&self) -> Result<(), QueryError>
1136    where
1137        E: EntityValue,
1138    {
1139        self.execute_rows()?.require_one()?;
1140        Ok(())
1141    }
1142
1143    /// Execute and require at least one matching row.
1144    pub fn require_some(&self) -> Result<(), QueryError>
1145    where
1146        E: EntityValue,
1147    {
1148        self.execute_rows()?.require_some()?;
1149        Ok(())
1150    }
1151}