Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5//!
6//! Terminal Execution Model
7//!
8//! Fluent terminals are concrete descriptors with a 1:1 mapping to session
9//! execution and explain entrypoints. This module may orchestrate descriptor
10//! construction, non-paged gating, and public output shaping, but it must not
11//! carry terminal-kind enums, transport output enums, or match-based execution
12//! dispatch. Adding a new terminal means adding a new descriptor type and one
13//! direct `TerminalStrategyDriver` implementation for that descriptor.
14
15use crate::{
16    db::{
17        DbSession, PersistedRow, Query,
18        query::{
19            api::ResponseCardinalityExt,
20            builder::{
21                AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
22                CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
23                FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
24                MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
25                MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
26                SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
27                ValuesBySlotWithIdsTerminal,
28            },
29            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
30            fluent::load::{FluentLoadQuery, LoadQueryResult},
31            intent::QueryError,
32            plan::AggregateKind,
33        },
34        response::EntityResponse,
35    },
36    traits::EntityValue,
37    types::{Decimal, Id},
38    value::{OutputValue, Value},
39};
40
41type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
42
43///
44/// TerminalStrategyDriver
45///
46/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
47/// query-owned strategy object and the session-owned execution/explain
48/// boundary. Implementations are deliberately thin: they only choose the
49/// matching `DbSession` method for an existing strategy type.
50///
51
52trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
53    type Output;
54    type ExplainOutput;
55
56    fn execute(
57        self,
58        session: &DbSession<E::Canister>,
59        query: &Query<E>,
60    ) -> Result<Self::Output, QueryError>;
61
62    fn explain(
63        &self,
64        session: &DbSession<E::Canister>,
65        query: &Query<E>,
66    ) -> Result<Self::ExplainOutput, QueryError>;
67}
68
69// Define one aggregate-style terminal driver implementation. The macro keeps
70// terminal descriptors 1:1 with session methods while removing repeated explain
71// wiring that is identical for every aggregate terminal.
72macro_rules! impl_aggregate_terminal_driver {
73    ($terminal:ty, $output:ty, $execute:ident) => {
74        impl<E> TerminalStrategyDriver<E> for $terminal
75        where
76            E: PersistedRow + EntityValue,
77        {
78            type Output = $output;
79            type ExplainOutput = ExplainAggregateTerminalPlan;
80
81            fn execute(
82                self,
83                session: &DbSession<E::Canister>,
84                query: &Query<E>,
85            ) -> Result<Self::Output, QueryError> {
86                session.$execute(query, self)
87            }
88
89            fn explain(
90                &self,
91                session: &DbSession<E::Canister>,
92                query: &Query<E>,
93            ) -> Result<Self::ExplainOutput, QueryError> {
94                session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
95            }
96        }
97    };
98}
99
100// Define one projection-style terminal driver implementation. Projection
101// terminals share the execution/explain shape but return execution descriptors
102// rather than aggregate-terminal plans.
103macro_rules! impl_projection_terminal_driver {
104    ($terminal:ty, $output:ty, $execute:ident) => {
105        impl<E> TerminalStrategyDriver<E> for $terminal
106        where
107            E: PersistedRow + EntityValue,
108        {
109            type Output = $output;
110            type ExplainOutput = ExplainExecutionNodeDescriptor;
111
112            fn execute(
113                self,
114                session: &DbSession<E::Canister>,
115                query: &Query<E>,
116            ) -> Result<Self::Output, QueryError> {
117                session.$execute(query, self)
118            }
119
120            fn explain(
121                &self,
122                session: &DbSession<E::Canister>,
123                query: &Query<E>,
124            ) -> Result<Self::ExplainOutput, QueryError> {
125                session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
126            }
127        }
128    };
129}
130
131impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
132impl_aggregate_terminal_driver!(
133    ExistsRowsTerminal,
134    bool,
135    execute_fluent_exists_rows_terminal
136);
137impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
138impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
139impl_aggregate_terminal_driver!(
140    MinIdBySlotTerminal,
141    Option<Id<E>>,
142    execute_fluent_min_id_by_slot
143);
144impl_aggregate_terminal_driver!(
145    MaxIdBySlotTerminal,
146    Option<Id<E>>,
147    execute_fluent_max_id_by_slot
148);
149impl_aggregate_terminal_driver!(
150    SumBySlotTerminal,
151    Option<Decimal>,
152    execute_fluent_sum_by_slot
153);
154impl_aggregate_terminal_driver!(
155    SumDistinctBySlotTerminal,
156    Option<Decimal>,
157    execute_fluent_sum_distinct_by_slot
158);
159impl_aggregate_terminal_driver!(
160    AvgBySlotTerminal,
161    Option<Decimal>,
162    execute_fluent_avg_by_slot
163);
164impl_aggregate_terminal_driver!(
165    AvgDistinctBySlotTerminal,
166    Option<Decimal>,
167    execute_fluent_avg_distinct_by_slot
168);
169impl_aggregate_terminal_driver!(
170    FirstIdTerminal,
171    Option<Id<E>>,
172    execute_fluent_first_id_terminal
173);
174impl_aggregate_terminal_driver!(
175    LastIdTerminal,
176    Option<Id<E>>,
177    execute_fluent_last_id_terminal
178);
179impl_aggregate_terminal_driver!(
180    NthIdBySlotTerminal,
181    Option<Id<E>>,
182    execute_fluent_nth_id_by_slot
183);
184impl_aggregate_terminal_driver!(
185    MedianIdBySlotTerminal,
186    Option<Id<E>>,
187    execute_fluent_median_id_by_slot
188);
189impl_aggregate_terminal_driver!(
190    MinMaxIdBySlotTerminal,
191    MinMaxByIds<E>,
192    execute_fluent_min_max_id_by_slot
193);
194
195impl_projection_terminal_driver!(
196    ValuesBySlotTerminal,
197    Vec<Value>,
198    execute_fluent_values_by_slot
199);
200impl_projection_terminal_driver!(
201    DistinctValuesBySlotTerminal,
202    Vec<Value>,
203    execute_fluent_distinct_values_by_slot
204);
205impl_projection_terminal_driver!(
206    CountDistinctBySlotTerminal,
207    u32,
208    execute_fluent_count_distinct_by_slot
209);
210impl_projection_terminal_driver!(
211    ValuesBySlotWithIdsTerminal,
212    Vec<(Id<E>, Value)>,
213    execute_fluent_values_by_with_ids_slot
214);
215impl_projection_terminal_driver!(
216    FirstValueBySlotTerminal,
217    Option<Value>,
218    execute_fluent_first_value_by_slot
219);
220impl_projection_terminal_driver!(
221    LastValueBySlotTerminal,
222    Option<Value>,
223    execute_fluent_last_value_by_slot
224);
225
226// Convert one runtime projection value into the public output boundary type.
227fn output(value: Value) -> OutputValue {
228    OutputValue::from(value)
229}
230
231// Convert one ordered runtime projection vector into the public output form.
232fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
233    values.into_iter().map(output).collect()
234}
235
236// Convert one ordered runtime `(id, value)` projection vector into the public output form.
237fn output_values_with_ids<E: PersistedRow>(
238    values: Vec<(Id<E>, Value)>,
239) -> Vec<(Id<E>, OutputValue)> {
240    values
241        .into_iter()
242        .map(|(id, value)| (id, output(value)))
243        .collect()
244}
245
246impl<E> FluentLoadQuery<'_, E>
247where
248    E: PersistedRow,
249{
250    // ------------------------------------------------------------------
251    // Execution (single semantic boundary)
252    // ------------------------------------------------------------------
253
254    /// Execute this query using the session's policy settings.
255    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
256    where
257        E: EntityValue,
258    {
259        self.with_non_paged(DbSession::execute_query_result)
260    }
261
262    /// Execute this query through the scalar rows-only session boundary.
263    pub fn execute_rows(&self) -> Result<EntityResponse<E>, QueryError>
264    where
265        E: EntityValue,
266    {
267        self.with_non_paged(DbSession::execute_scalar_query_rows)
268    }
269
270    // Run one terminal operation through the canonical non-paged fluent policy
271    // gate so execution and explain helpers cannot drift on readiness checks.
272    fn with_non_paged<T>(
273        &self,
274        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
275    ) -> Result<T, QueryError>
276    where
277        E: EntityValue,
278    {
279        self.ensure_non_paged_mode_ready()?;
280        map(self.session, self.query())
281    }
282
283    // Resolve the structural execution descriptor for this fluent load query
284    // through the session-owned visible-index explain path once.
285    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
286    where
287        E: EntityValue,
288    {
289        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
290    }
291
292    // Render one descriptor-derived execution surface so text/json explain
293    // terminals do not each forward the same session explain call ad hoc.
294    fn render_execution_descriptor(
295        &self,
296        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
297    ) -> Result<String, QueryError>
298    where
299        E: EntityValue,
300    {
301        let descriptor = self.explain_execution_descriptor()?;
302
303        Ok(render(descriptor))
304    }
305
306    // Execute one prepared terminal descriptor through the canonical
307    // non-paged fluent policy gate.
308    fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
309    where
310        E: EntityValue,
311        S: TerminalStrategyDriver<E>,
312    {
313        self.with_non_paged(|session, query| strategy.execute(session, query))
314    }
315
316    // Explain one prepared terminal strategy through the same non-paged fluent
317    // policy gate used by execution.
318    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
319    where
320        E: EntityValue,
321        S: TerminalStrategyDriver<E>,
322    {
323        self.with_non_paged(|session, query| strategy.explain(session, query))
324    }
325
326    // ------------------------------------------------------------------
327    // Execution terminals — semantic only
328    // ------------------------------------------------------------------
329
330    /// Execute and return whether the result set is empty.
331    pub fn is_empty(&self) -> Result<bool, QueryError>
332    where
333        E: EntityValue,
334    {
335        self.not_exists()
336    }
337
338    /// Execute and return whether no matching row exists.
339    pub fn not_exists(&self) -> Result<bool, QueryError>
340    where
341        E: EntityValue,
342    {
343        Ok(!self.exists()?)
344    }
345
346    /// Execute and return whether at least one matching row exists.
347    pub fn exists(&self) -> Result<bool, QueryError>
348    where
349        E: EntityValue,
350    {
351        self.execute_terminal(ExistsRowsTerminal::new())
352    }
353
354    /// Explain scalar `exists()` routing without executing the terminal.
355    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
356    where
357        E: EntityValue,
358    {
359        self.explain_terminal(&ExistsRowsTerminal::new())
360    }
361
362    /// Explain scalar `not_exists()` routing without executing the terminal.
363    ///
364    /// This remains an `exists()` execution plan with negated boolean semantics.
365    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
366    where
367        E: EntityValue,
368    {
369        self.explain_exists()
370    }
371
372    /// Explain scalar load execution shape without executing the query.
373    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
374    where
375        E: EntityValue,
376    {
377        self.explain_execution_descriptor()
378    }
379
380    /// Explain scalar load execution shape as deterministic text.
381    pub fn explain_execution_text(&self) -> Result<String, QueryError>
382    where
383        E: EntityValue,
384    {
385        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
386    }
387
388    /// Explain scalar load execution shape as canonical JSON.
389    pub fn explain_execution_json(&self) -> Result<String, QueryError>
390    where
391        E: EntityValue,
392    {
393        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
394    }
395
396    /// Explain scalar load execution shape as verbose text with diagnostics.
397    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
402    }
403
404    /// Execute and return the number of matching rows.
405    pub fn count(&self) -> Result<u32, QueryError>
406    where
407        E: EntityValue,
408    {
409        self.execute_terminal(CountRowsTerminal::new())
410    }
411
412    /// Execute and return the total persisted payload bytes for the effective
413    /// result window.
414    pub fn bytes(&self) -> Result<u64, QueryError>
415    where
416        E: EntityValue,
417    {
418        self.with_non_paged(DbSession::execute_fluent_bytes)
419    }
420
421    /// Execute and return the total serialized bytes for `field` over the
422    /// effective result window.
423    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
424    where
425        E: EntityValue,
426    {
427        let target_slot = self.resolve_non_paged_slot(field)?;
428
429        self.with_non_paged(|session, query| {
430            session.execute_fluent_bytes_by_slot(query, target_slot)
431        })
432    }
433
434    /// Explain `bytes_by(field)` routing without executing the terminal.
435    pub fn explain_bytes_by(
436        &self,
437        field: impl AsRef<str>,
438    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
439    where
440        E: EntityValue,
441    {
442        let target_slot = self.resolve_non_paged_slot(field)?;
443
444        self.with_non_paged(|session, query| {
445            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
446        })
447    }
448
449    /// Execute and return the smallest matching identifier, if any.
450    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
451    where
452        E: EntityValue,
453    {
454        self.execute_terminal(MinIdTerminal::new())
455    }
456
457    /// Explain scalar `min()` routing without executing the terminal.
458    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
459    where
460        E: EntityValue,
461    {
462        self.explain_terminal(&MinIdTerminal::new())
463    }
464
465    /// Execute and return the id of the row with the smallest value for `field`.
466    ///
467    /// Ties are deterministic: equal field values resolve by primary key ascending.
468    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
469    where
470        E: EntityValue,
471    {
472        let target_slot = self.resolve_non_paged_slot(field)?;
473
474        self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
475    }
476
477    /// Execute and return the largest matching identifier, if any.
478    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
479    where
480        E: EntityValue,
481    {
482        self.execute_terminal(MaxIdTerminal::new())
483    }
484
485    /// Explain scalar `max()` routing without executing the terminal.
486    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
487    where
488        E: EntityValue,
489    {
490        self.explain_terminal(&MaxIdTerminal::new())
491    }
492
493    /// Execute and return the id of the row with the largest value for `field`.
494    ///
495    /// Ties are deterministic: equal field values resolve by primary key ascending.
496    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
497    where
498        E: EntityValue,
499    {
500        let target_slot = self.resolve_non_paged_slot(field)?;
501
502        self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
503    }
504
505    /// Execute and return the id at zero-based ordinal `nth` when rows are
506    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
507    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
508    where
509        E: EntityValue,
510    {
511        let target_slot = self.resolve_non_paged_slot(field)?;
512
513        self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
514    }
515
516    /// Execute and return the sum of `field` over matching rows.
517    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
518    where
519        E: EntityValue,
520    {
521        let target_slot = self.resolve_non_paged_slot(field)?;
522
523        self.execute_terminal(SumBySlotTerminal::new(target_slot))
524    }
525
526    /// Explain scalar `sum_by(field)` routing without executing the terminal.
527    pub fn explain_sum_by(
528        &self,
529        field: impl AsRef<str>,
530    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
531    where
532        E: EntityValue,
533    {
534        let target_slot = self.resolve_non_paged_slot(field)?;
535
536        self.explain_terminal(&SumBySlotTerminal::new(target_slot))
537    }
538
539    /// Execute and return the sum of distinct `field` values.
540    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
541    where
542        E: EntityValue,
543    {
544        let target_slot = self.resolve_non_paged_slot(field)?;
545
546        self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
547    }
548
549    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
550    pub fn explain_sum_distinct_by(
551        &self,
552        field: impl AsRef<str>,
553    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
554    where
555        E: EntityValue,
556    {
557        let target_slot = self.resolve_non_paged_slot(field)?;
558
559        self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
560    }
561
562    /// Execute and return the average of `field` over matching rows.
563    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
564    where
565        E: EntityValue,
566    {
567        let target_slot = self.resolve_non_paged_slot(field)?;
568
569        self.execute_terminal(AvgBySlotTerminal::new(target_slot))
570    }
571
572    /// Explain scalar `avg_by(field)` routing without executing the terminal.
573    pub fn explain_avg_by(
574        &self,
575        field: impl AsRef<str>,
576    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
577    where
578        E: EntityValue,
579    {
580        let target_slot = self.resolve_non_paged_slot(field)?;
581
582        self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
583    }
584
585    /// Execute and return the average of distinct `field` values.
586    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
587    where
588        E: EntityValue,
589    {
590        let target_slot = self.resolve_non_paged_slot(field)?;
591
592        self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
593    }
594
595    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
596    pub fn explain_avg_distinct_by(
597        &self,
598        field: impl AsRef<str>,
599    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
600    where
601        E: EntityValue,
602    {
603        let target_slot = self.resolve_non_paged_slot(field)?;
604
605        self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
606    }
607
608    /// Execute and return the median id by `field` using deterministic ordering
609    /// `(field asc, primary key asc)`.
610    ///
611    /// Even-length windows select the lower median.
612    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
613    where
614        E: EntityValue,
615    {
616        let target_slot = self.resolve_non_paged_slot(field)?;
617
618        self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
619    }
620
621    /// Execute and return the number of distinct values for `field` over the
622    /// effective result window.
623    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
624    where
625        E: EntityValue,
626    {
627        let target_slot = self.resolve_non_paged_slot(field)?;
628
629        self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
630    }
631
632    /// Explain `count_distinct_by(field)` routing without executing the terminal.
633    pub fn explain_count_distinct_by(
634        &self,
635        field: impl AsRef<str>,
636    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
637    where
638        E: EntityValue,
639    {
640        let target_slot = self.resolve_non_paged_slot(field)?;
641
642        self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
643    }
644
645    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
646    ///
647    /// Tie handling is deterministic for both extrema: primary key ascending.
648    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
649    where
650        E: EntityValue,
651    {
652        let target_slot = self.resolve_non_paged_slot(field)?;
653
654        self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
655    }
656
657    /// Execute and return projected field values for the effective result window.
658    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
659    where
660        E: EntityValue,
661    {
662        let target_slot = self.resolve_non_paged_slot(field)?;
663
664        self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
665            .map(output_values)
666    }
667
668    /// Execute and return projected values for one shared bounded projection
669    /// over the effective response window.
670    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
671    where
672        E: EntityValue,
673        P: ValueProjectionExpr,
674    {
675        let target_slot = self.resolve_non_paged_slot(projection.field())?;
676
677        self.with_non_paged(|session, query| {
678            session.execute_fluent_project_values_by_slot(
679                query,
680                target_slot,
681                projection.projection_plan().into_expr(),
682            )
683        })
684        .map(output_values)
685    }
686
687    /// Explain `project_values(projection)` routing without executing it.
688    pub fn explain_project_values<P>(
689        &self,
690        projection: &P,
691    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
692    where
693        E: EntityValue,
694        P: ValueProjectionExpr,
695    {
696        let target_slot = self.resolve_non_paged_slot(projection.field())?;
697
698        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
699    }
700
701    /// Explain `values_by(field)` routing without executing the terminal.
702    pub fn explain_values_by(
703        &self,
704        field: impl AsRef<str>,
705    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
706    where
707        E: EntityValue,
708    {
709        let target_slot = self.resolve_non_paged_slot(field)?;
710
711        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
712    }
713
714    /// Execute and return the first `k` rows from the effective response window.
715    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
716    where
717        E: EntityValue,
718    {
719        self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
720    }
721
722    /// Execute and return the top `k` rows by `field` under deterministic
723    /// ordering `(field desc, primary_key asc)` over the effective response
724    /// window.
725    ///
726    /// This terminal applies its own ordering and does not preserve query
727    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
728    /// matches `max_by(field)` selection semantics.
729    pub fn top_k_by(
730        &self,
731        field: impl AsRef<str>,
732        take_count: u32,
733    ) -> Result<EntityResponse<E>, QueryError>
734    where
735        E: EntityValue,
736    {
737        let target_slot = self.resolve_non_paged_slot(field)?;
738
739        self.with_non_paged(|session, query| {
740            session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
741        })
742    }
743
744    /// Execute and return the bottom `k` rows by `field` under deterministic
745    /// ordering `(field asc, primary_key asc)` over the effective response
746    /// window.
747    ///
748    /// This terminal applies its own ordering and does not preserve query
749    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
750    /// matches `min_by(field)` selection semantics.
751    pub fn bottom_k_by(
752        &self,
753        field: impl AsRef<str>,
754        take_count: u32,
755    ) -> Result<EntityResponse<E>, QueryError>
756    where
757        E: EntityValue,
758    {
759        let target_slot = self.resolve_non_paged_slot(field)?;
760
761        self.with_non_paged(|session, query| {
762            session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
763        })
764    }
765
766    /// Execute and return projected values for the top `k` rows by `field`
767    /// under deterministic ordering `(field desc, primary_key asc)` over the
768    /// effective response window.
769    ///
770    /// Ranking is applied before projection and does not preserve query
771    /// `order_term(...)` row order in the returned values. For `k = 1`, this
772    /// matches `max_by(field)` projected to one value.
773    pub fn top_k_by_values(
774        &self,
775        field: impl AsRef<str>,
776        take_count: u32,
777    ) -> Result<Vec<OutputValue>, QueryError>
778    where
779        E: EntityValue,
780    {
781        let target_slot = self.resolve_non_paged_slot(field)?;
782
783        self.with_non_paged(|session, query| {
784            session
785                .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
786                .map(output_values)
787        })
788    }
789
790    /// Execute and return projected values for the bottom `k` rows by `field`
791    /// under deterministic ordering `(field asc, primary_key asc)` over the
792    /// effective response window.
793    ///
794    /// Ranking is applied before projection and does not preserve query
795    /// `order_term(...)` row order in the returned values. For `k = 1`, this
796    /// matches `min_by(field)` projected to one value.
797    pub fn bottom_k_by_values(
798        &self,
799        field: impl AsRef<str>,
800        take_count: u32,
801    ) -> Result<Vec<OutputValue>, QueryError>
802    where
803        E: EntityValue,
804    {
805        let target_slot = self.resolve_non_paged_slot(field)?;
806
807        self.with_non_paged(|session, query| {
808            session
809                .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
810                .map(output_values)
811        })
812    }
813
814    /// Execute and return projected id/value pairs for the top `k` rows by
815    /// `field` under deterministic ordering `(field desc, primary_key asc)`
816    /// over the effective response window.
817    ///
818    /// Ranking is applied before projection and does not preserve query
819    /// `order_term(...)` row order in the returned values. For `k = 1`, this
820    /// matches `max_by(field)` projected to one `(id, value)` pair.
821    pub fn top_k_by_with_ids(
822        &self,
823        field: impl AsRef<str>,
824        take_count: u32,
825    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
826    where
827        E: EntityValue,
828    {
829        let target_slot = self.resolve_non_paged_slot(field)?;
830
831        self.with_non_paged(|session, query| {
832            session
833                .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
834                .map(output_values_with_ids)
835        })
836    }
837
838    /// Execute and return projected id/value pairs for the bottom `k` rows by
839    /// `field` under deterministic ordering `(field asc, primary_key asc)`
840    /// over the effective response window.
841    ///
842    /// Ranking is applied before projection and does not preserve query
843    /// `order_term(...)` row order in the returned values. For `k = 1`, this
844    /// matches `min_by(field)` projected to one `(id, value)` pair.
845    pub fn bottom_k_by_with_ids(
846        &self,
847        field: impl AsRef<str>,
848        take_count: u32,
849    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
850    where
851        E: EntityValue,
852    {
853        let target_slot = self.resolve_non_paged_slot(field)?;
854
855        self.with_non_paged(|session, query| {
856            session
857                .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
858                .map(output_values_with_ids)
859        })
860    }
861
862    /// Execute and return distinct projected field values for the effective
863    /// result window, preserving first-observed value order.
864    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
865    where
866        E: EntityValue,
867    {
868        let target_slot = self.resolve_non_paged_slot(field)?;
869
870        self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
871            .map(output_values)
872    }
873
874    /// Explain `distinct_values_by(field)` routing without executing the terminal.
875    pub fn explain_distinct_values_by(
876        &self,
877        field: impl AsRef<str>,
878    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
879    where
880        E: EntityValue,
881    {
882        let target_slot = self.resolve_non_paged_slot(field)?;
883
884        self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
885    }
886
887    /// Execute and return projected field values paired with row ids for the
888    /// effective result window.
889    pub fn values_by_with_ids(
890        &self,
891        field: impl AsRef<str>,
892    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
893    where
894        E: EntityValue,
895    {
896        let target_slot = self.resolve_non_paged_slot(field)?;
897
898        self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
899            .map(output_values_with_ids)
900    }
901
902    /// Execute and return projected id/value pairs for one shared bounded
903    /// projection over the effective response window.
904    pub fn project_values_with_ids<P>(
905        &self,
906        projection: &P,
907    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
908    where
909        E: EntityValue,
910        P: ValueProjectionExpr,
911    {
912        let target_slot = self.resolve_non_paged_slot(projection.field())?;
913
914        self.with_non_paged(|session, query| {
915            session.execute_fluent_project_values_with_ids_by_slot(
916                query,
917                target_slot,
918                projection.projection_plan().into_expr(),
919            )
920        })
921        .map(output_values_with_ids)
922    }
923
924    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
925    pub fn explain_values_by_with_ids(
926        &self,
927        field: impl AsRef<str>,
928    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
929    where
930        E: EntityValue,
931    {
932        let target_slot = self.resolve_non_paged_slot(field)?;
933
934        self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
935    }
936
937    /// Execute and return the first projected field value in effective response
938    /// order, if any.
939    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
940    where
941        E: EntityValue,
942    {
943        let target_slot = self.resolve_non_paged_slot(field)?;
944
945        self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
946            .map(|value| value.map(output))
947    }
948
949    /// Execute and return the first projected value for one shared bounded
950    /// projection in effective response order, if any.
951    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
952    where
953        E: EntityValue,
954        P: ValueProjectionExpr,
955    {
956        let target_slot = self.resolve_non_paged_slot(projection.field())?;
957
958        self.with_non_paged(|session, query| {
959            session.execute_fluent_project_terminal_value_by_slot(
960                query,
961                target_slot,
962                AggregateKind::First,
963                projection.projection_plan().into_expr(),
964            )
965        })
966        .map(|value| value.map(output))
967    }
968
969    /// Explain `first_value_by(field)` routing without executing the terminal.
970    pub fn explain_first_value_by(
971        &self,
972        field: impl AsRef<str>,
973    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
974    where
975        E: EntityValue,
976    {
977        let target_slot = self.resolve_non_paged_slot(field)?;
978
979        self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
980    }
981
982    /// Execute and return the last projected field value in effective response
983    /// order, if any.
984    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
985    where
986        E: EntityValue,
987    {
988        let target_slot = self.resolve_non_paged_slot(field)?;
989
990        self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
991            .map(|value| value.map(output))
992    }
993
994    /// Execute and return the last projected value for one shared bounded
995    /// projection in effective response order, if any.
996    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
997    where
998        E: EntityValue,
999        P: ValueProjectionExpr,
1000    {
1001        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1002
1003        self.with_non_paged(|session, query| {
1004            session.execute_fluent_project_terminal_value_by_slot(
1005                query,
1006                target_slot,
1007                AggregateKind::Last,
1008                projection.projection_plan().into_expr(),
1009            )
1010        })
1011        .map(|value| value.map(output))
1012    }
1013
1014    /// Explain `last_value_by(field)` routing without executing the terminal.
1015    pub fn explain_last_value_by(
1016        &self,
1017        field: impl AsRef<str>,
1018    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1019    where
1020        E: EntityValue,
1021    {
1022        let target_slot = self.resolve_non_paged_slot(field)?;
1023
1024        self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1025    }
1026
1027    /// Execute and return the first matching identifier in response order, if any.
1028    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1029    where
1030        E: EntityValue,
1031    {
1032        self.execute_terminal(FirstIdTerminal::new())
1033    }
1034
1035    /// Explain scalar `first()` routing without executing the terminal.
1036    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1037    where
1038        E: EntityValue,
1039    {
1040        self.explain_terminal(&FirstIdTerminal::new())
1041    }
1042
1043    /// Execute and return the last matching identifier in response order, if any.
1044    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1045    where
1046        E: EntityValue,
1047    {
1048        self.execute_terminal(LastIdTerminal::new())
1049    }
1050
1051    /// Explain scalar `last()` routing without executing the terminal.
1052    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1053    where
1054        E: EntityValue,
1055    {
1056        self.explain_terminal(&LastIdTerminal::new())
1057    }
1058
1059    /// Execute and require exactly one matching row.
1060    pub fn require_one(&self) -> Result<(), QueryError>
1061    where
1062        E: EntityValue,
1063    {
1064        self.execute_rows()?.require_one()?;
1065        Ok(())
1066    }
1067
1068    /// Execute and require at least one matching row.
1069    pub fn require_some(&self) -> Result<(), QueryError>
1070    where
1071        E: EntityValue,
1072    {
1073        self.execute_rows()?.require_some()?;
1074        Ok(())
1075    }
1076}