Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5//!
6//! Terminal Execution Model
7//!
8//! Fluent terminals are concrete descriptors with a 1:1 mapping to session
9//! execution and explain entrypoints. This module may orchestrate descriptor
10//! construction, non-paged gating, and public output shaping, but it must not
11//! carry terminal-kind enums, transport output enums, or match-based execution
12//! dispatch. Adding a new terminal means adding a new descriptor type and one
13//! direct `TerminalStrategyDriver` implementation for that descriptor.
14
15use crate::{
16    db::{
17        DbSession, PersistedRow, Query,
18        query::{
19            api::ResponseCardinalityExt,
20            builder::{
21                AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
22                CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
23                FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
24                MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
25                MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
26                SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
27                ValuesBySlotWithIdsTerminal,
28            },
29            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
30            fluent::load::{FluentLoadQuery, LoadQueryResult},
31            intent::QueryError,
32        },
33        response::EntityResponse,
34    },
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::{OutputValue, Value},
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42///
43/// TerminalStrategyDriver
44///
45/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
46/// query-owned strategy object and the session-owned execution/explain
47/// boundary. Implementations are deliberately thin: they only choose the
48/// matching `DbSession` method for an existing strategy type.
49///
50
51trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
52    type Output;
53    type ExplainOutput;
54
55    fn execute(
56        self,
57        session: &DbSession<E::Canister>,
58        query: &Query<E>,
59    ) -> Result<Self::Output, QueryError>;
60
61    fn explain(
62        &self,
63        session: &DbSession<E::Canister>,
64        query: &Query<E>,
65    ) -> Result<Self::ExplainOutput, QueryError>;
66}
67
68// Define one aggregate-style terminal driver implementation. The macro keeps
69// terminal descriptors 1:1 with session methods while removing repeated explain
70// wiring that is identical for every aggregate terminal.
71macro_rules! impl_aggregate_terminal_driver {
72    ($terminal:ty, $output:ty, $execute:ident) => {
73        impl<E> TerminalStrategyDriver<E> for $terminal
74        where
75            E: PersistedRow + EntityValue,
76        {
77            type Output = $output;
78            type ExplainOutput = ExplainAggregateTerminalPlan;
79
80            fn execute(
81                self,
82                session: &DbSession<E::Canister>,
83                query: &Query<E>,
84            ) -> Result<Self::Output, QueryError> {
85                session.$execute(query, self)
86            }
87
88            fn explain(
89                &self,
90                session: &DbSession<E::Canister>,
91                query: &Query<E>,
92            ) -> Result<Self::ExplainOutput, QueryError> {
93                session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
94            }
95        }
96    };
97}
98
99// Define one projection-style terminal driver implementation. Projection
100// terminals share the execution/explain shape but return execution descriptors
101// rather than aggregate-terminal plans.
102macro_rules! impl_projection_terminal_driver {
103    ($terminal:ty, $output:ty, $execute:ident) => {
104        impl<E> TerminalStrategyDriver<E> for $terminal
105        where
106            E: PersistedRow + EntityValue,
107        {
108            type Output = $output;
109            type ExplainOutput = ExplainExecutionNodeDescriptor;
110
111            fn execute(
112                self,
113                session: &DbSession<E::Canister>,
114                query: &Query<E>,
115            ) -> Result<Self::Output, QueryError> {
116                session.$execute(query, self)
117            }
118
119            fn explain(
120                &self,
121                session: &DbSession<E::Canister>,
122                query: &Query<E>,
123            ) -> Result<Self::ExplainOutput, QueryError> {
124                session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
125            }
126        }
127    };
128}
129
130impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
131impl_aggregate_terminal_driver!(
132    ExistsRowsTerminal,
133    bool,
134    execute_fluent_exists_rows_terminal
135);
136impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
137impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
138impl_aggregate_terminal_driver!(
139    MinIdBySlotTerminal,
140    Option<Id<E>>,
141    execute_fluent_min_id_by_slot
142);
143impl_aggregate_terminal_driver!(
144    MaxIdBySlotTerminal,
145    Option<Id<E>>,
146    execute_fluent_max_id_by_slot
147);
148impl_aggregate_terminal_driver!(
149    SumBySlotTerminal,
150    Option<Decimal>,
151    execute_fluent_sum_by_slot
152);
153impl_aggregate_terminal_driver!(
154    SumDistinctBySlotTerminal,
155    Option<Decimal>,
156    execute_fluent_sum_distinct_by_slot
157);
158impl_aggregate_terminal_driver!(
159    AvgBySlotTerminal,
160    Option<Decimal>,
161    execute_fluent_avg_by_slot
162);
163impl_aggregate_terminal_driver!(
164    AvgDistinctBySlotTerminal,
165    Option<Decimal>,
166    execute_fluent_avg_distinct_by_slot
167);
168impl_aggregate_terminal_driver!(
169    FirstIdTerminal,
170    Option<Id<E>>,
171    execute_fluent_first_id_terminal
172);
173impl_aggregate_terminal_driver!(
174    LastIdTerminal,
175    Option<Id<E>>,
176    execute_fluent_last_id_terminal
177);
178impl_aggregate_terminal_driver!(
179    NthIdBySlotTerminal,
180    Option<Id<E>>,
181    execute_fluent_nth_id_by_slot
182);
183impl_aggregate_terminal_driver!(
184    MedianIdBySlotTerminal,
185    Option<Id<E>>,
186    execute_fluent_median_id_by_slot
187);
188impl_aggregate_terminal_driver!(
189    MinMaxIdBySlotTerminal,
190    MinMaxByIds<E>,
191    execute_fluent_min_max_id_by_slot
192);
193
194impl_projection_terminal_driver!(
195    ValuesBySlotTerminal,
196    Vec<Value>,
197    execute_fluent_values_by_slot
198);
199impl_projection_terminal_driver!(
200    DistinctValuesBySlotTerminal,
201    Vec<Value>,
202    execute_fluent_distinct_values_by_slot
203);
204impl_projection_terminal_driver!(
205    CountDistinctBySlotTerminal,
206    u32,
207    execute_fluent_count_distinct_by_slot
208);
209impl_projection_terminal_driver!(
210    ValuesBySlotWithIdsTerminal,
211    Vec<(Id<E>, Value)>,
212    execute_fluent_values_by_with_ids_slot
213);
214impl_projection_terminal_driver!(
215    FirstValueBySlotTerminal,
216    Option<Value>,
217    execute_fluent_first_value_by_slot
218);
219impl_projection_terminal_driver!(
220    LastValueBySlotTerminal,
221    Option<Value>,
222    execute_fluent_last_value_by_slot
223);
224
225// Convert one runtime projection value into the public output boundary type.
226fn output(value: Value) -> OutputValue {
227    OutputValue::from(value)
228}
229
230// Convert one ordered runtime projection vector into the public output form.
231fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
232    values.into_iter().map(output).collect()
233}
234
235// Convert one ordered runtime `(id, value)` projection vector into the public output form.
236fn output_values_with_ids<E: PersistedRow>(
237    values: Vec<(Id<E>, Value)>,
238) -> Vec<(Id<E>, OutputValue)> {
239    values
240        .into_iter()
241        .map(|(id, value)| (id, output(value)))
242        .collect()
243}
244
245impl<E> FluentLoadQuery<'_, E>
246where
247    E: PersistedRow,
248{
249    // ------------------------------------------------------------------
250    // Execution (single semantic boundary)
251    // ------------------------------------------------------------------
252
253    /// Execute this query using the session's policy settings.
254    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
255    where
256        E: EntityValue,
257    {
258        self.with_non_paged(DbSession::execute_query_result)
259    }
260
261    // Run one terminal operation through the canonical non-paged fluent policy
262    // gate so execution and explain helpers cannot drift on readiness checks.
263    fn with_non_paged<T>(
264        &self,
265        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
266    ) -> Result<T, QueryError>
267    where
268        E: EntityValue,
269    {
270        self.ensure_non_paged_mode_ready()?;
271        map(self.session, self.query())
272    }
273
274    // Resolve the structural execution descriptor for this fluent load query
275    // through the session-owned visible-index explain path once.
276    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
277    where
278        E: EntityValue,
279    {
280        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
281    }
282
283    // Render one descriptor-derived execution surface so text/json explain
284    // terminals do not each forward the same session explain call ad hoc.
285    fn render_execution_descriptor(
286        &self,
287        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
288    ) -> Result<String, QueryError>
289    where
290        E: EntityValue,
291    {
292        let descriptor = self.explain_execution_descriptor()?;
293
294        Ok(render(descriptor))
295    }
296
297    // Execute one prepared terminal descriptor through the canonical
298    // non-paged fluent policy gate.
299    fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
300    where
301        E: EntityValue,
302        S: TerminalStrategyDriver<E>,
303    {
304        self.with_non_paged(|session, query| strategy.execute(session, query))
305    }
306
307    // Explain one prepared terminal strategy through the same non-paged fluent
308    // policy gate used by execution.
309    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
310    where
311        E: EntityValue,
312        S: TerminalStrategyDriver<E>,
313    {
314        self.with_non_paged(|session, query| strategy.explain(session, query))
315    }
316
317    // Apply one shared bounded value projection to iterator-like terminal
318    // output while preserving source order and cardinality.
319    fn project_terminal_items<P, T, U>(
320        projection: &P,
321        values: impl IntoIterator<Item = T>,
322        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
323    ) -> Result<Vec<U>, QueryError>
324    where
325        P: ValueProjectionExpr,
326    {
327        values
328            .into_iter()
329            .map(|value| map(projection, value))
330            .collect()
331    }
332
333    // ------------------------------------------------------------------
334    // Execution terminals — semantic only
335    // ------------------------------------------------------------------
336
337    /// Execute and return whether the result set is empty.
338    pub fn is_empty(&self) -> Result<bool, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.not_exists()
343    }
344
345    /// Execute and return whether no matching row exists.
346    pub fn not_exists(&self) -> Result<bool, QueryError>
347    where
348        E: EntityValue,
349    {
350        Ok(!self.exists()?)
351    }
352
353    /// Execute and return whether at least one matching row exists.
354    pub fn exists(&self) -> Result<bool, QueryError>
355    where
356        E: EntityValue,
357    {
358        self.execute_terminal(ExistsRowsTerminal::new())
359    }
360
361    /// Explain scalar `exists()` routing without executing the terminal.
362    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
363    where
364        E: EntityValue,
365    {
366        self.explain_terminal(&ExistsRowsTerminal::new())
367    }
368
369    /// Explain scalar `not_exists()` routing without executing the terminal.
370    ///
371    /// This remains an `exists()` execution plan with negated boolean semantics.
372    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
373    where
374        E: EntityValue,
375    {
376        self.explain_exists()
377    }
378
379    /// Explain scalar load execution shape without executing the query.
380    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
381    where
382        E: EntityValue,
383    {
384        self.explain_execution_descriptor()
385    }
386
387    /// Explain scalar load execution shape as deterministic text.
388    pub fn explain_execution_text(&self) -> Result<String, QueryError>
389    where
390        E: EntityValue,
391    {
392        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
393    }
394
395    /// Explain scalar load execution shape as canonical JSON.
396    pub fn explain_execution_json(&self) -> Result<String, QueryError>
397    where
398        E: EntityValue,
399    {
400        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
401    }
402
403    /// Explain scalar load execution shape as verbose text with diagnostics.
404    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
405    where
406        E: EntityValue,
407    {
408        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
409    }
410
411    /// Execute and return the number of matching rows.
412    pub fn count(&self) -> Result<u32, QueryError>
413    where
414        E: EntityValue,
415    {
416        self.execute_terminal(CountRowsTerminal::new())
417    }
418
419    /// Execute and return the total persisted payload bytes for the effective
420    /// result window.
421    pub fn bytes(&self) -> Result<u64, QueryError>
422    where
423        E: EntityValue,
424    {
425        self.with_non_paged(DbSession::execute_fluent_bytes)
426    }
427
428    /// Execute and return the total serialized bytes for `field` over the
429    /// effective result window.
430    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
431    where
432        E: EntityValue,
433    {
434        let target_slot = self.resolve_non_paged_slot(field)?;
435
436        self.with_non_paged(|session, query| {
437            session.execute_fluent_bytes_by_slot(query, target_slot)
438        })
439    }
440
441    /// Explain `bytes_by(field)` routing without executing the terminal.
442    pub fn explain_bytes_by(
443        &self,
444        field: impl AsRef<str>,
445    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
446    where
447        E: EntityValue,
448    {
449        let target_slot = self.resolve_non_paged_slot(field)?;
450
451        self.with_non_paged(|session, query| {
452            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
453        })
454    }
455
456    /// Execute and return the smallest matching identifier, if any.
457    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
458    where
459        E: EntityValue,
460    {
461        self.execute_terminal(MinIdTerminal::new())
462    }
463
464    /// Explain scalar `min()` routing without executing the terminal.
465    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
466    where
467        E: EntityValue,
468    {
469        self.explain_terminal(&MinIdTerminal::new())
470    }
471
472    /// Execute and return the id of the row with the smallest value for `field`.
473    ///
474    /// Ties are deterministic: equal field values resolve by primary key ascending.
475    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
476    where
477        E: EntityValue,
478    {
479        let target_slot = self.resolve_non_paged_slot(field)?;
480
481        self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
482    }
483
484    /// Execute and return the largest matching identifier, if any.
485    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
486    where
487        E: EntityValue,
488    {
489        self.execute_terminal(MaxIdTerminal::new())
490    }
491
492    /// Explain scalar `max()` routing without executing the terminal.
493    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
494    where
495        E: EntityValue,
496    {
497        self.explain_terminal(&MaxIdTerminal::new())
498    }
499
500    /// Execute and return the id of the row with the largest value for `field`.
501    ///
502    /// Ties are deterministic: equal field values resolve by primary key ascending.
503    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
504    where
505        E: EntityValue,
506    {
507        let target_slot = self.resolve_non_paged_slot(field)?;
508
509        self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
510    }
511
512    /// Execute and return the id at zero-based ordinal `nth` when rows are
513    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
514    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
515    where
516        E: EntityValue,
517    {
518        let target_slot = self.resolve_non_paged_slot(field)?;
519
520        self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
521    }
522
523    /// Execute and return the sum of `field` over matching rows.
524    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
525    where
526        E: EntityValue,
527    {
528        let target_slot = self.resolve_non_paged_slot(field)?;
529
530        self.execute_terminal(SumBySlotTerminal::new(target_slot))
531    }
532
533    /// Explain scalar `sum_by(field)` routing without executing the terminal.
534    pub fn explain_sum_by(
535        &self,
536        field: impl AsRef<str>,
537    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
538    where
539        E: EntityValue,
540    {
541        let target_slot = self.resolve_non_paged_slot(field)?;
542
543        self.explain_terminal(&SumBySlotTerminal::new(target_slot))
544    }
545
546    /// Execute and return the sum of distinct `field` values.
547    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
548    where
549        E: EntityValue,
550    {
551        let target_slot = self.resolve_non_paged_slot(field)?;
552
553        self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
554    }
555
556    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
557    pub fn explain_sum_distinct_by(
558        &self,
559        field: impl AsRef<str>,
560    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
561    where
562        E: EntityValue,
563    {
564        let target_slot = self.resolve_non_paged_slot(field)?;
565
566        self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
567    }
568
569    /// Execute and return the average of `field` over matching rows.
570    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
571    where
572        E: EntityValue,
573    {
574        let target_slot = self.resolve_non_paged_slot(field)?;
575
576        self.execute_terminal(AvgBySlotTerminal::new(target_slot))
577    }
578
579    /// Explain scalar `avg_by(field)` routing without executing the terminal.
580    pub fn explain_avg_by(
581        &self,
582        field: impl AsRef<str>,
583    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
584    where
585        E: EntityValue,
586    {
587        let target_slot = self.resolve_non_paged_slot(field)?;
588
589        self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
590    }
591
592    /// Execute and return the average of distinct `field` values.
593    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
594    where
595        E: EntityValue,
596    {
597        let target_slot = self.resolve_non_paged_slot(field)?;
598
599        self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
600    }
601
602    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
603    pub fn explain_avg_distinct_by(
604        &self,
605        field: impl AsRef<str>,
606    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
607    where
608        E: EntityValue,
609    {
610        let target_slot = self.resolve_non_paged_slot(field)?;
611
612        self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
613    }
614
615    /// Execute and return the median id by `field` using deterministic ordering
616    /// `(field asc, primary key asc)`.
617    ///
618    /// Even-length windows select the lower median.
619    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
620    where
621        E: EntityValue,
622    {
623        let target_slot = self.resolve_non_paged_slot(field)?;
624
625        self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
626    }
627
628    /// Execute and return the number of distinct values for `field` over the
629    /// effective result window.
630    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
631    where
632        E: EntityValue,
633    {
634        let target_slot = self.resolve_non_paged_slot(field)?;
635
636        self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
637    }
638
639    /// Explain `count_distinct_by(field)` routing without executing the terminal.
640    pub fn explain_count_distinct_by(
641        &self,
642        field: impl AsRef<str>,
643    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
644    where
645        E: EntityValue,
646    {
647        let target_slot = self.resolve_non_paged_slot(field)?;
648
649        self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
650    }
651
652    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
653    ///
654    /// Tie handling is deterministic for both extrema: primary key ascending.
655    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
656    where
657        E: EntityValue,
658    {
659        let target_slot = self.resolve_non_paged_slot(field)?;
660
661        self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
662    }
663
664    /// Execute and return projected field values for the effective result window.
665    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
666    where
667        E: EntityValue,
668    {
669        let target_slot = self.resolve_non_paged_slot(field)?;
670
671        self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
672            .map(output_values)
673    }
674
675    /// Execute and return projected values for one shared bounded projection
676    /// over the effective response window.
677    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
678    where
679        E: EntityValue,
680        P: ValueProjectionExpr,
681    {
682        let target_slot = self.resolve_non_paged_slot(projection.field())?;
683        let values = self.execute_terminal(ValuesBySlotTerminal::new(target_slot))?;
684
685        Self::project_terminal_items(projection, values, |projection, value| {
686            projection.apply_value(value)
687        })
688        .map(output_values)
689    }
690
691    /// Explain `project_values(projection)` routing without executing it.
692    pub fn explain_project_values<P>(
693        &self,
694        projection: &P,
695    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
696    where
697        E: EntityValue,
698        P: ValueProjectionExpr,
699    {
700        let target_slot = self.resolve_non_paged_slot(projection.field())?;
701
702        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
703    }
704
705    /// Explain `values_by(field)` routing without executing the terminal.
706    pub fn explain_values_by(
707        &self,
708        field: impl AsRef<str>,
709    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
710    where
711        E: EntityValue,
712    {
713        let target_slot = self.resolve_non_paged_slot(field)?;
714
715        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
716    }
717
718    /// Execute and return the first `k` rows from the effective response window.
719    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
720    where
721        E: EntityValue,
722    {
723        self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
724    }
725
726    /// Execute and return the top `k` rows by `field` under deterministic
727    /// ordering `(field desc, primary_key asc)` over the effective response
728    /// window.
729    ///
730    /// This terminal applies its own ordering and does not preserve query
731    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
732    /// matches `max_by(field)` selection semantics.
733    pub fn top_k_by(
734        &self,
735        field: impl AsRef<str>,
736        take_count: u32,
737    ) -> Result<EntityResponse<E>, QueryError>
738    where
739        E: EntityValue,
740    {
741        let target_slot = self.resolve_non_paged_slot(field)?;
742
743        self.with_non_paged(|session, query| {
744            session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
745        })
746    }
747
748    /// Execute and return the bottom `k` rows by `field` under deterministic
749    /// ordering `(field asc, primary_key asc)` over the effective response
750    /// window.
751    ///
752    /// This terminal applies its own ordering and does not preserve query
753    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
754    /// matches `min_by(field)` selection semantics.
755    pub fn bottom_k_by(
756        &self,
757        field: impl AsRef<str>,
758        take_count: u32,
759    ) -> Result<EntityResponse<E>, QueryError>
760    where
761        E: EntityValue,
762    {
763        let target_slot = self.resolve_non_paged_slot(field)?;
764
765        self.with_non_paged(|session, query| {
766            session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
767        })
768    }
769
770    /// Execute and return projected values for the top `k` rows by `field`
771    /// under deterministic ordering `(field desc, primary_key asc)` over the
772    /// effective response window.
773    ///
774    /// Ranking is applied before projection and does not preserve query
775    /// `order_term(...)` row order in the returned values. For `k = 1`, this
776    /// matches `max_by(field)` projected to one value.
777    pub fn top_k_by_values(
778        &self,
779        field: impl AsRef<str>,
780        take_count: u32,
781    ) -> Result<Vec<OutputValue>, QueryError>
782    where
783        E: EntityValue,
784    {
785        let target_slot = self.resolve_non_paged_slot(field)?;
786
787        self.with_non_paged(|session, query| {
788            session
789                .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
790                .map(output_values)
791        })
792    }
793
794    /// Execute and return projected values for the bottom `k` rows by `field`
795    /// under deterministic ordering `(field asc, primary_key asc)` over the
796    /// effective response window.
797    ///
798    /// Ranking is applied before projection and does not preserve query
799    /// `order_term(...)` row order in the returned values. For `k = 1`, this
800    /// matches `min_by(field)` projected to one value.
801    pub fn bottom_k_by_values(
802        &self,
803        field: impl AsRef<str>,
804        take_count: u32,
805    ) -> Result<Vec<OutputValue>, QueryError>
806    where
807        E: EntityValue,
808    {
809        let target_slot = self.resolve_non_paged_slot(field)?;
810
811        self.with_non_paged(|session, query| {
812            session
813                .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
814                .map(output_values)
815        })
816    }
817
818    /// Execute and return projected id/value pairs for the top `k` rows by
819    /// `field` under deterministic ordering `(field desc, primary_key asc)`
820    /// over the effective response window.
821    ///
822    /// Ranking is applied before projection and does not preserve query
823    /// `order_term(...)` row order in the returned values. For `k = 1`, this
824    /// matches `max_by(field)` projected to one `(id, value)` pair.
825    pub fn top_k_by_with_ids(
826        &self,
827        field: impl AsRef<str>,
828        take_count: u32,
829    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
830    where
831        E: EntityValue,
832    {
833        let target_slot = self.resolve_non_paged_slot(field)?;
834
835        self.with_non_paged(|session, query| {
836            session
837                .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
838                .map(output_values_with_ids)
839        })
840    }
841
842    /// Execute and return projected id/value pairs for the bottom `k` rows by
843    /// `field` under deterministic ordering `(field asc, primary_key asc)`
844    /// over the effective response window.
845    ///
846    /// Ranking is applied before projection and does not preserve query
847    /// `order_term(...)` row order in the returned values. For `k = 1`, this
848    /// matches `min_by(field)` projected to one `(id, value)` pair.
849    pub fn bottom_k_by_with_ids(
850        &self,
851        field: impl AsRef<str>,
852        take_count: u32,
853    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
854    where
855        E: EntityValue,
856    {
857        let target_slot = self.resolve_non_paged_slot(field)?;
858
859        self.with_non_paged(|session, query| {
860            session
861                .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
862                .map(output_values_with_ids)
863        })
864    }
865
866    /// Execute and return distinct projected field values for the effective
867    /// result window, preserving first-observed value order.
868    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
869    where
870        E: EntityValue,
871    {
872        let target_slot = self.resolve_non_paged_slot(field)?;
873
874        self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
875            .map(output_values)
876    }
877
878    /// Explain `distinct_values_by(field)` routing without executing the terminal.
879    pub fn explain_distinct_values_by(
880        &self,
881        field: impl AsRef<str>,
882    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
883    where
884        E: EntityValue,
885    {
886        let target_slot = self.resolve_non_paged_slot(field)?;
887
888        self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
889    }
890
891    /// Execute and return projected field values paired with row ids for the
892    /// effective result window.
893    pub fn values_by_with_ids(
894        &self,
895        field: impl AsRef<str>,
896    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
897    where
898        E: EntityValue,
899    {
900        let target_slot = self.resolve_non_paged_slot(field)?;
901
902        self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
903            .map(output_values_with_ids)
904    }
905
906    /// Execute and return projected id/value pairs for one shared bounded
907    /// projection over the effective response window.
908    pub fn project_values_with_ids<P>(
909        &self,
910        projection: &P,
911    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
912    where
913        E: EntityValue,
914        P: ValueProjectionExpr,
915    {
916        let target_slot = self.resolve_non_paged_slot(projection.field())?;
917        let values = self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))?;
918
919        Self::project_terminal_items(projection, values, |projection, (id, value)| {
920            Ok((id, projection.apply_value(value)?))
921        })
922        .map(output_values_with_ids)
923    }
924
925    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
926    pub fn explain_values_by_with_ids(
927        &self,
928        field: impl AsRef<str>,
929    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
930    where
931        E: EntityValue,
932    {
933        let target_slot = self.resolve_non_paged_slot(field)?;
934
935        self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
936    }
937
938    /// Execute and return the first projected field value in effective response
939    /// order, if any.
940    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
941    where
942        E: EntityValue,
943    {
944        let target_slot = self.resolve_non_paged_slot(field)?;
945
946        self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
947            .map(|value| value.map(output))
948    }
949
950    /// Execute and return the first projected value for one shared bounded
951    /// projection in effective response order, if any.
952    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
953    where
954        E: EntityValue,
955        P: ValueProjectionExpr,
956    {
957        let target_slot = self.resolve_non_paged_slot(projection.field())?;
958        let value = self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))?;
959
960        let mut projected =
961            Self::project_terminal_items(projection, value, |projection, value| {
962                projection.apply_value(value)
963            })?;
964
965        Ok(projected.pop().map(output))
966    }
967
968    /// Explain `first_value_by(field)` routing without executing the terminal.
969    pub fn explain_first_value_by(
970        &self,
971        field: impl AsRef<str>,
972    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
973    where
974        E: EntityValue,
975    {
976        let target_slot = self.resolve_non_paged_slot(field)?;
977
978        self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
979    }
980
981    /// Execute and return the last projected field value in effective response
982    /// order, if any.
983    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
984    where
985        E: EntityValue,
986    {
987        let target_slot = self.resolve_non_paged_slot(field)?;
988
989        self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
990            .map(|value| value.map(output))
991    }
992
993    /// Execute and return the last projected value for one shared bounded
994    /// projection in effective response order, if any.
995    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
996    where
997        E: EntityValue,
998        P: ValueProjectionExpr,
999    {
1000        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1001        let value = self.execute_terminal(LastValueBySlotTerminal::new(target_slot))?;
1002
1003        let mut projected =
1004            Self::project_terminal_items(projection, value, |projection, value| {
1005                projection.apply_value(value)
1006            })?;
1007
1008        Ok(projected.pop().map(output))
1009    }
1010
1011    /// Explain `last_value_by(field)` routing without executing the terminal.
1012    pub fn explain_last_value_by(
1013        &self,
1014        field: impl AsRef<str>,
1015    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1016    where
1017        E: EntityValue,
1018    {
1019        let target_slot = self.resolve_non_paged_slot(field)?;
1020
1021        self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1022    }
1023
1024    /// Execute and return the first matching identifier in response order, if any.
1025    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1026    where
1027        E: EntityValue,
1028    {
1029        self.execute_terminal(FirstIdTerminal::new())
1030    }
1031
1032    /// Explain scalar `first()` routing without executing the terminal.
1033    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1034    where
1035        E: EntityValue,
1036    {
1037        self.explain_terminal(&FirstIdTerminal::new())
1038    }
1039
1040    /// Execute and return the last matching identifier in response order, if any.
1041    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1042    where
1043        E: EntityValue,
1044    {
1045        self.execute_terminal(LastIdTerminal::new())
1046    }
1047
1048    /// Explain scalar `last()` routing without executing the terminal.
1049    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1050    where
1051        E: EntityValue,
1052    {
1053        self.explain_terminal(&LastIdTerminal::new())
1054    }
1055
1056    /// Execute and require exactly one matching row.
1057    pub fn require_one(&self) -> Result<(), QueryError>
1058    where
1059        E: EntityValue,
1060    {
1061        self.execute()?.into_rows()?.require_one()?;
1062        Ok(())
1063    }
1064
1065    /// Execute and require at least one matching row.
1066    pub fn require_some(&self) -> Result<(), QueryError>
1067    where
1068        E: EntityValue,
1069    {
1070        self.execute()?.into_rows()?.require_some()?;
1071        Ok(())
1072    }
1073}