Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5//!
6//! Terminal Execution Model
7//!
8//! Fluent terminals are concrete descriptors with a 1:1 mapping to session
9//! execution and explain entrypoints. This module may orchestrate descriptor
10//! construction, non-paged gating, and public output shaping, but it must not
11//! carry terminal-kind enums, transport output enums, or match-based execution
12//! dispatch. Adding a new terminal means adding a new descriptor type and one
13//! direct `TerminalStrategyDriver` implementation for that descriptor.
14
15#[cfg(feature = "diagnostics")]
16use crate::db::FluentTerminalExecutionAttribution;
17use crate::{
18    db::{
19        DbSession, PersistedRow, Query,
20        query::{
21            api::ResponseCardinalityExt,
22            builder::{
23                AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
24                CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
25                FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
26                MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
27                MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
28                SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
29                ValuesBySlotWithIdsTerminal,
30            },
31            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
32            fluent::load::{FluentLoadQuery, LoadQueryResult},
33            intent::QueryError,
34            plan::AggregateKind,
35        },
36        response::EntityResponse,
37    },
38    traits::EntityValue,
39    types::{Decimal, Id},
40    value::{OutputValue, Value},
41};
42
43type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
44
45///
46/// TerminalStrategyDriver
47///
48/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
49/// query-owned strategy object and the session-owned execution/explain
50/// boundary. Implementations are deliberately thin: they only choose the
51/// matching `DbSession` method for an existing strategy type.
52///
53
54trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
55    type Output;
56    type ExplainOutput;
57
58    fn execute(
59        self,
60        session: &DbSession<E::Canister>,
61        query: &Query<E>,
62    ) -> Result<Self::Output, QueryError>;
63
64    fn explain(
65        &self,
66        session: &DbSession<E::Canister>,
67        query: &Query<E>,
68    ) -> Result<Self::ExplainOutput, QueryError>;
69}
70
71// Define one aggregate-style terminal driver implementation. The macro keeps
72// terminal descriptors 1:1 with session methods while removing repeated explain
73// wiring that is identical for every aggregate terminal.
74macro_rules! impl_aggregate_terminal_driver {
75    ($terminal:ty, $output:ty, $execute:ident) => {
76        impl<E> TerminalStrategyDriver<E> for $terminal
77        where
78            E: PersistedRow + EntityValue,
79        {
80            type Output = $output;
81            type ExplainOutput = ExplainAggregateTerminalPlan;
82
83            fn execute(
84                self,
85                session: &DbSession<E::Canister>,
86                query: &Query<E>,
87            ) -> Result<Self::Output, QueryError> {
88                session.$execute(query, self)
89            }
90
91            fn explain(
92                &self,
93                session: &DbSession<E::Canister>,
94                query: &Query<E>,
95            ) -> Result<Self::ExplainOutput, QueryError> {
96                session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
97            }
98        }
99    };
100}
101
102// Define one projection-style terminal driver implementation. Projection
103// terminals share the execution/explain shape but return execution descriptors
104// rather than aggregate-terminal plans.
105macro_rules! impl_projection_terminal_driver {
106    ($terminal:ty, $output:ty, $execute:ident) => {
107        impl<E> TerminalStrategyDriver<E> for $terminal
108        where
109            E: PersistedRow + EntityValue,
110        {
111            type Output = $output;
112            type ExplainOutput = ExplainExecutionNodeDescriptor;
113
114            fn execute(
115                self,
116                session: &DbSession<E::Canister>,
117                query: &Query<E>,
118            ) -> Result<Self::Output, QueryError> {
119                session.$execute(query, self)
120            }
121
122            fn explain(
123                &self,
124                session: &DbSession<E::Canister>,
125                query: &Query<E>,
126            ) -> Result<Self::ExplainOutput, QueryError> {
127                session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
128            }
129        }
130    };
131}
132
133impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
134impl_aggregate_terminal_driver!(
135    ExistsRowsTerminal,
136    bool,
137    execute_fluent_exists_rows_terminal
138);
139impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
140impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
141impl_aggregate_terminal_driver!(
142    MinIdBySlotTerminal,
143    Option<Id<E>>,
144    execute_fluent_min_id_by_slot
145);
146impl_aggregate_terminal_driver!(
147    MaxIdBySlotTerminal,
148    Option<Id<E>>,
149    execute_fluent_max_id_by_slot
150);
151impl_aggregate_terminal_driver!(
152    SumBySlotTerminal,
153    Option<Decimal>,
154    execute_fluent_sum_by_slot
155);
156impl_aggregate_terminal_driver!(
157    SumDistinctBySlotTerminal,
158    Option<Decimal>,
159    execute_fluent_sum_distinct_by_slot
160);
161impl_aggregate_terminal_driver!(
162    AvgBySlotTerminal,
163    Option<Decimal>,
164    execute_fluent_avg_by_slot
165);
166impl_aggregate_terminal_driver!(
167    AvgDistinctBySlotTerminal,
168    Option<Decimal>,
169    execute_fluent_avg_distinct_by_slot
170);
171impl_aggregate_terminal_driver!(
172    FirstIdTerminal,
173    Option<Id<E>>,
174    execute_fluent_first_id_terminal
175);
176impl_aggregate_terminal_driver!(
177    LastIdTerminal,
178    Option<Id<E>>,
179    execute_fluent_last_id_terminal
180);
181impl_aggregate_terminal_driver!(
182    NthIdBySlotTerminal,
183    Option<Id<E>>,
184    execute_fluent_nth_id_by_slot
185);
186impl_aggregate_terminal_driver!(
187    MedianIdBySlotTerminal,
188    Option<Id<E>>,
189    execute_fluent_median_id_by_slot
190);
191impl_aggregate_terminal_driver!(
192    MinMaxIdBySlotTerminal,
193    MinMaxByIds<E>,
194    execute_fluent_min_max_id_by_slot
195);
196
197impl_projection_terminal_driver!(
198    ValuesBySlotTerminal,
199    Vec<Value>,
200    execute_fluent_values_by_slot
201);
202impl_projection_terminal_driver!(
203    DistinctValuesBySlotTerminal,
204    Vec<Value>,
205    execute_fluent_distinct_values_by_slot
206);
207impl_projection_terminal_driver!(
208    CountDistinctBySlotTerminal,
209    u32,
210    execute_fluent_count_distinct_by_slot
211);
212impl_projection_terminal_driver!(
213    ValuesBySlotWithIdsTerminal,
214    Vec<(Id<E>, Value)>,
215    execute_fluent_values_by_with_ids_slot
216);
217impl_projection_terminal_driver!(
218    FirstValueBySlotTerminal,
219    Option<Value>,
220    execute_fluent_first_value_by_slot
221);
222impl_projection_terminal_driver!(
223    LastValueBySlotTerminal,
224    Option<Value>,
225    execute_fluent_last_value_by_slot
226);
227
228// Convert one runtime projection value into the public output boundary type.
229fn output(value: Value) -> OutputValue {
230    OutputValue::from(value)
231}
232
233// Convert one ordered runtime projection vector into the public output form.
234fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
235    values.into_iter().map(output).collect()
236}
237
238// Convert one ordered runtime `(id, value)` projection vector into the public output form.
239fn output_values_with_ids<E: PersistedRow>(
240    values: Vec<(Id<E>, Value)>,
241) -> Vec<(Id<E>, OutputValue)> {
242    values
243        .into_iter()
244        .map(|(id, value)| (id, output(value)))
245        .collect()
246}
247
248impl<E> FluentLoadQuery<'_, E>
249where
250    E: PersistedRow,
251{
252    // ------------------------------------------------------------------
253    // Execution (single semantic boundary)
254    // ------------------------------------------------------------------
255
256    /// Execute this query using the session's policy settings.
257    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
258    where
259        E: EntityValue,
260    {
261        self.with_non_paged(DbSession::execute_query_result)
262    }
263
264    /// Execute this query through the scalar rows-only session boundary.
265    pub fn execute_rows(&self) -> Result<EntityResponse<E>, QueryError>
266    where
267        E: EntityValue,
268    {
269        self.with_non_paged(DbSession::execute_scalar_query_rows)
270    }
271
272    // Run one terminal operation through the canonical non-paged fluent policy
273    // gate so execution and explain helpers cannot drift on readiness checks.
274    fn with_non_paged<T>(
275        &self,
276        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
277    ) -> Result<T, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.ensure_non_paged_mode_ready()?;
282        map(self.session, self.query())
283    }
284
285    // Resolve the structural execution descriptor for this fluent load query
286    // through the session-owned visible-index explain path once.
287    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
288    where
289        E: EntityValue,
290    {
291        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
292    }
293
294    // Render one descriptor-derived execution surface so text/json explain
295    // terminals do not each forward the same session explain call ad hoc.
296    fn render_execution_descriptor(
297        &self,
298        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
299    ) -> Result<String, QueryError>
300    where
301        E: EntityValue,
302    {
303        let descriptor = self.explain_execution_descriptor()?;
304
305        Ok(render(descriptor))
306    }
307
308    // Execute one prepared terminal descriptor through the canonical
309    // non-paged fluent policy gate.
310    fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
311    where
312        E: EntityValue,
313        S: TerminalStrategyDriver<E>,
314    {
315        self.with_non_paged(|session, query| strategy.execute(session, query))
316    }
317
318    // Explain one prepared terminal strategy through the same non-paged fluent
319    // policy gate used by execution.
320    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
321    where
322        E: EntityValue,
323        S: TerminalStrategyDriver<E>,
324    {
325        self.with_non_paged(|session, query| strategy.explain(session, query))
326    }
327
328    // ------------------------------------------------------------------
329    // Execution terminals — semantic only
330    // ------------------------------------------------------------------
331
332    /// Execute and return whether the result set is empty.
333    pub fn is_empty(&self) -> Result<bool, QueryError>
334    where
335        E: EntityValue,
336    {
337        self.not_exists()
338    }
339
340    /// Execute and return whether no matching row exists.
341    pub fn not_exists(&self) -> Result<bool, QueryError>
342    where
343        E: EntityValue,
344    {
345        Ok(!self.exists()?)
346    }
347
348    /// Execute and return whether at least one matching row exists.
349    pub fn exists(&self) -> Result<bool, QueryError>
350    where
351        E: EntityValue,
352    {
353        self.execute_terminal(ExistsRowsTerminal::new())
354    }
355
356    /// Execute and return whether at least one matching row exists with
357    /// terminal attribution.
358    #[cfg(feature = "diagnostics")]
359    #[doc(hidden)]
360    pub fn exists_with_attribution(
361        &self,
362    ) -> Result<(bool, FluentTerminalExecutionAttribution), QueryError>
363    where
364        E: EntityValue,
365    {
366        self.with_non_paged(|session, query| {
367            session.execute_fluent_exists_rows_terminal_with_attribution(
368                query,
369                ExistsRowsTerminal::new(),
370            )
371        })
372    }
373
374    /// Explain scalar `exists()` routing without executing the terminal.
375    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
376    where
377        E: EntityValue,
378    {
379        self.explain_terminal(&ExistsRowsTerminal::new())
380    }
381
382    /// Explain scalar `not_exists()` routing without executing the terminal.
383    ///
384    /// This remains an `exists()` execution plan with negated boolean semantics.
385    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
386    where
387        E: EntityValue,
388    {
389        self.explain_exists()
390    }
391
392    /// Explain scalar load execution shape without executing the query.
393    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
394    where
395        E: EntityValue,
396    {
397        self.explain_execution_descriptor()
398    }
399
400    /// Explain scalar load execution shape as deterministic text.
401    pub fn explain_execution_text(&self) -> Result<String, QueryError>
402    where
403        E: EntityValue,
404    {
405        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
406    }
407
408    /// Explain scalar load execution shape as canonical JSON.
409    pub fn explain_execution_json(&self) -> Result<String, QueryError>
410    where
411        E: EntityValue,
412    {
413        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
414    }
415
416    /// Explain scalar load execution shape as verbose text with diagnostics.
417    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
418    where
419        E: EntityValue,
420    {
421        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
422    }
423
424    /// Execute and return the number of matching rows.
425    pub fn count(&self) -> Result<u32, QueryError>
426    where
427        E: EntityValue,
428    {
429        self.execute_terminal(CountRowsTerminal::new())
430    }
431
432    /// Execute and return the number of matching rows with terminal attribution.
433    #[cfg(feature = "diagnostics")]
434    #[doc(hidden)]
435    pub fn count_with_attribution(
436        &self,
437    ) -> Result<(u32, FluentTerminalExecutionAttribution), QueryError>
438    where
439        E: EntityValue,
440    {
441        self.with_non_paged(|session, query| {
442            session.execute_fluent_count_rows_terminal_with_attribution(
443                query,
444                CountRowsTerminal::new(),
445            )
446        })
447    }
448
449    /// Execute and return the total persisted payload bytes for the effective
450    /// result window.
451    pub fn bytes(&self) -> Result<u64, QueryError>
452    where
453        E: EntityValue,
454    {
455        self.with_non_paged(DbSession::execute_fluent_bytes)
456    }
457
458    /// Execute and return the total serialized bytes for `field` over the
459    /// effective result window.
460    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
461    where
462        E: EntityValue,
463    {
464        let target_slot = self.resolve_non_paged_slot(field)?;
465
466        self.with_non_paged(|session, query| {
467            session.execute_fluent_bytes_by_slot(query, target_slot)
468        })
469    }
470
471    /// Explain `bytes_by(field)` routing without executing the terminal.
472    pub fn explain_bytes_by(
473        &self,
474        field: impl AsRef<str>,
475    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
476    where
477        E: EntityValue,
478    {
479        let target_slot = self.resolve_non_paged_slot(field)?;
480
481        self.with_non_paged(|session, query| {
482            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
483        })
484    }
485
486    /// Execute and return the smallest matching identifier, if any.
487    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
488    where
489        E: EntityValue,
490    {
491        self.execute_terminal(MinIdTerminal::new())
492    }
493
494    /// Explain scalar `min()` routing without executing the terminal.
495    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
496    where
497        E: EntityValue,
498    {
499        self.explain_terminal(&MinIdTerminal::new())
500    }
501
502    /// Execute and return the id of the row with the smallest value for `field`.
503    ///
504    /// Ties are deterministic: equal field values resolve by primary key ascending.
505    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
506    where
507        E: EntityValue,
508    {
509        let target_slot = self.resolve_non_paged_slot(field)?;
510
511        self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
512    }
513
514    /// Execute and return the largest matching identifier, if any.
515    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
516    where
517        E: EntityValue,
518    {
519        self.execute_terminal(MaxIdTerminal::new())
520    }
521
522    /// Explain scalar `max()` routing without executing the terminal.
523    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
524    where
525        E: EntityValue,
526    {
527        self.explain_terminal(&MaxIdTerminal::new())
528    }
529
530    /// Execute and return the id of the row with the largest value for `field`.
531    ///
532    /// Ties are deterministic: equal field values resolve by primary key ascending.
533    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
534    where
535        E: EntityValue,
536    {
537        let target_slot = self.resolve_non_paged_slot(field)?;
538
539        self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
540    }
541
542    /// Execute and return the id at zero-based ordinal `nth` when rows are
543    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
544    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
545    where
546        E: EntityValue,
547    {
548        let target_slot = self.resolve_non_paged_slot(field)?;
549
550        self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
551    }
552
553    /// Execute and return the sum of `field` over matching rows.
554    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
555    where
556        E: EntityValue,
557    {
558        let target_slot = self.resolve_non_paged_slot(field)?;
559
560        self.execute_terminal(SumBySlotTerminal::new(target_slot))
561    }
562
563    /// Explain scalar `sum_by(field)` routing without executing the terminal.
564    pub fn explain_sum_by(
565        &self,
566        field: impl AsRef<str>,
567    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
568    where
569        E: EntityValue,
570    {
571        let target_slot = self.resolve_non_paged_slot(field)?;
572
573        self.explain_terminal(&SumBySlotTerminal::new(target_slot))
574    }
575
576    /// Execute and return the sum of distinct `field` values.
577    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
578    where
579        E: EntityValue,
580    {
581        let target_slot = self.resolve_non_paged_slot(field)?;
582
583        self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
584    }
585
586    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
587    pub fn explain_sum_distinct_by(
588        &self,
589        field: impl AsRef<str>,
590    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
591    where
592        E: EntityValue,
593    {
594        let target_slot = self.resolve_non_paged_slot(field)?;
595
596        self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
597    }
598
599    /// Execute and return the average of `field` over matching rows.
600    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
601    where
602        E: EntityValue,
603    {
604        let target_slot = self.resolve_non_paged_slot(field)?;
605
606        self.execute_terminal(AvgBySlotTerminal::new(target_slot))
607    }
608
609    /// Explain scalar `avg_by(field)` routing without executing the terminal.
610    pub fn explain_avg_by(
611        &self,
612        field: impl AsRef<str>,
613    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
614    where
615        E: EntityValue,
616    {
617        let target_slot = self.resolve_non_paged_slot(field)?;
618
619        self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
620    }
621
622    /// Execute and return the average of distinct `field` values.
623    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
624    where
625        E: EntityValue,
626    {
627        let target_slot = self.resolve_non_paged_slot(field)?;
628
629        self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
630    }
631
632    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
633    pub fn explain_avg_distinct_by(
634        &self,
635        field: impl AsRef<str>,
636    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
637    where
638        E: EntityValue,
639    {
640        let target_slot = self.resolve_non_paged_slot(field)?;
641
642        self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
643    }
644
645    /// Execute and return the median id by `field` using deterministic ordering
646    /// `(field asc, primary key asc)`.
647    ///
648    /// Even-length windows select the lower median.
649    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
650    where
651        E: EntityValue,
652    {
653        let target_slot = self.resolve_non_paged_slot(field)?;
654
655        self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
656    }
657
658    /// Execute and return the number of distinct values for `field` over the
659    /// effective result window.
660    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
661    where
662        E: EntityValue,
663    {
664        let target_slot = self.resolve_non_paged_slot(field)?;
665
666        self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
667    }
668
669    /// Explain `count_distinct_by(field)` routing without executing the terminal.
670    pub fn explain_count_distinct_by(
671        &self,
672        field: impl AsRef<str>,
673    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
674    where
675        E: EntityValue,
676    {
677        let target_slot = self.resolve_non_paged_slot(field)?;
678
679        self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
680    }
681
682    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
683    ///
684    /// Tie handling is deterministic for both extrema: primary key ascending.
685    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
686    where
687        E: EntityValue,
688    {
689        let target_slot = self.resolve_non_paged_slot(field)?;
690
691        self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
692    }
693
694    /// Execute and return projected field values for the effective result window.
695    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
696    where
697        E: EntityValue,
698    {
699        let target_slot = self.resolve_non_paged_slot(field)?;
700
701        self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
702            .map(output_values)
703    }
704
705    /// Execute and return projected values for one shared bounded projection
706    /// over the effective response window.
707    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
708    where
709        E: EntityValue,
710        P: ValueProjectionExpr,
711    {
712        let target_slot = self.resolve_non_paged_slot(projection.field())?;
713
714        self.with_non_paged(|session, query| {
715            session.execute_fluent_project_values_by_slot(
716                query,
717                target_slot,
718                projection.projection_plan().into_expr(),
719            )
720        })
721        .map(output_values)
722    }
723
724    /// Explain `project_values(projection)` routing without executing it.
725    pub fn explain_project_values<P>(
726        &self,
727        projection: &P,
728    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
729    where
730        E: EntityValue,
731        P: ValueProjectionExpr,
732    {
733        let target_slot = self.resolve_non_paged_slot(projection.field())?;
734
735        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
736    }
737
738    /// Explain `values_by(field)` routing without executing the terminal.
739    pub fn explain_values_by(
740        &self,
741        field: impl AsRef<str>,
742    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
743    where
744        E: EntityValue,
745    {
746        let target_slot = self.resolve_non_paged_slot(field)?;
747
748        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
749    }
750
751    /// Execute and return the first `k` rows from the effective response window.
752    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
753    where
754        E: EntityValue,
755    {
756        self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
757    }
758
759    /// Execute and return the top `k` rows by `field` under deterministic
760    /// ordering `(field desc, primary_key asc)` over the effective response
761    /// window.
762    ///
763    /// This terminal applies its own ordering and does not preserve query
764    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
765    /// matches `max_by(field)` selection semantics.
766    pub fn top_k_by(
767        &self,
768        field: impl AsRef<str>,
769        take_count: u32,
770    ) -> Result<EntityResponse<E>, QueryError>
771    where
772        E: EntityValue,
773    {
774        let target_slot = self.resolve_non_paged_slot(field)?;
775
776        self.with_non_paged(|session, query| {
777            session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
778        })
779    }
780
781    /// Execute and return the bottom `k` rows by `field` under deterministic
782    /// ordering `(field asc, primary_key asc)` over the effective response
783    /// window.
784    ///
785    /// This terminal applies its own ordering and does not preserve query
786    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
787    /// matches `min_by(field)` selection semantics.
788    pub fn bottom_k_by(
789        &self,
790        field: impl AsRef<str>,
791        take_count: u32,
792    ) -> Result<EntityResponse<E>, QueryError>
793    where
794        E: EntityValue,
795    {
796        let target_slot = self.resolve_non_paged_slot(field)?;
797
798        self.with_non_paged(|session, query| {
799            session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
800        })
801    }
802
803    /// Execute and return projected values for the top `k` rows by `field`
804    /// under deterministic ordering `(field desc, primary_key asc)` over the
805    /// effective response window.
806    ///
807    /// Ranking is applied before projection and does not preserve query
808    /// `order_term(...)` row order in the returned values. For `k = 1`, this
809    /// matches `max_by(field)` projected to one value.
810    pub fn top_k_by_values(
811        &self,
812        field: impl AsRef<str>,
813        take_count: u32,
814    ) -> Result<Vec<OutputValue>, QueryError>
815    where
816        E: EntityValue,
817    {
818        let target_slot = self.resolve_non_paged_slot(field)?;
819
820        self.with_non_paged(|session, query| {
821            session
822                .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
823                .map(output_values)
824        })
825    }
826
827    /// Execute and return projected values for the bottom `k` rows by `field`
828    /// under deterministic ordering `(field asc, primary_key asc)` over the
829    /// effective response window.
830    ///
831    /// Ranking is applied before projection and does not preserve query
832    /// `order_term(...)` row order in the returned values. For `k = 1`, this
833    /// matches `min_by(field)` projected to one value.
834    pub fn bottom_k_by_values(
835        &self,
836        field: impl AsRef<str>,
837        take_count: u32,
838    ) -> Result<Vec<OutputValue>, QueryError>
839    where
840        E: EntityValue,
841    {
842        let target_slot = self.resolve_non_paged_slot(field)?;
843
844        self.with_non_paged(|session, query| {
845            session
846                .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
847                .map(output_values)
848        })
849    }
850
851    /// Execute and return projected id/value pairs for the top `k` rows by
852    /// `field` under deterministic ordering `(field desc, primary_key asc)`
853    /// over the effective response window.
854    ///
855    /// Ranking is applied before projection and does not preserve query
856    /// `order_term(...)` row order in the returned values. For `k = 1`, this
857    /// matches `max_by(field)` projected to one `(id, value)` pair.
858    pub fn top_k_by_with_ids(
859        &self,
860        field: impl AsRef<str>,
861        take_count: u32,
862    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
863    where
864        E: EntityValue,
865    {
866        let target_slot = self.resolve_non_paged_slot(field)?;
867
868        self.with_non_paged(|session, query| {
869            session
870                .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
871                .map(output_values_with_ids)
872        })
873    }
874
875    /// Execute and return projected id/value pairs for the bottom `k` rows by
876    /// `field` under deterministic ordering `(field asc, primary_key asc)`
877    /// over the effective response window.
878    ///
879    /// Ranking is applied before projection and does not preserve query
880    /// `order_term(...)` row order in the returned values. For `k = 1`, this
881    /// matches `min_by(field)` projected to one `(id, value)` pair.
882    pub fn bottom_k_by_with_ids(
883        &self,
884        field: impl AsRef<str>,
885        take_count: u32,
886    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
887    where
888        E: EntityValue,
889    {
890        let target_slot = self.resolve_non_paged_slot(field)?;
891
892        self.with_non_paged(|session, query| {
893            session
894                .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
895                .map(output_values_with_ids)
896        })
897    }
898
899    /// Execute and return distinct projected field values for the effective
900    /// result window, preserving first-observed value order.
901    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
902    where
903        E: EntityValue,
904    {
905        let target_slot = self.resolve_non_paged_slot(field)?;
906
907        self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
908            .map(output_values)
909    }
910
911    /// Explain `distinct_values_by(field)` routing without executing the terminal.
912    pub fn explain_distinct_values_by(
913        &self,
914        field: impl AsRef<str>,
915    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
916    where
917        E: EntityValue,
918    {
919        let target_slot = self.resolve_non_paged_slot(field)?;
920
921        self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
922    }
923
924    /// Execute and return projected field values paired with row ids for the
925    /// effective result window.
926    pub fn values_by_with_ids(
927        &self,
928        field: impl AsRef<str>,
929    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
930    where
931        E: EntityValue,
932    {
933        let target_slot = self.resolve_non_paged_slot(field)?;
934
935        self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
936            .map(output_values_with_ids)
937    }
938
939    /// Execute and return projected id/value pairs for one shared bounded
940    /// projection over the effective response window.
941    pub fn project_values_with_ids<P>(
942        &self,
943        projection: &P,
944    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
945    where
946        E: EntityValue,
947        P: ValueProjectionExpr,
948    {
949        let target_slot = self.resolve_non_paged_slot(projection.field())?;
950
951        self.with_non_paged(|session, query| {
952            session.execute_fluent_project_values_with_ids_by_slot(
953                query,
954                target_slot,
955                projection.projection_plan().into_expr(),
956            )
957        })
958        .map(output_values_with_ids)
959    }
960
961    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
962    pub fn explain_values_by_with_ids(
963        &self,
964        field: impl AsRef<str>,
965    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
966    where
967        E: EntityValue,
968    {
969        let target_slot = self.resolve_non_paged_slot(field)?;
970
971        self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
972    }
973
974    /// Execute and return the first projected field value in effective response
975    /// order, if any.
976    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
977    where
978        E: EntityValue,
979    {
980        let target_slot = self.resolve_non_paged_slot(field)?;
981
982        self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
983            .map(|value| value.map(output))
984    }
985
986    /// Execute and return the first projected value for one shared bounded
987    /// projection in effective response order, if any.
988    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
989    where
990        E: EntityValue,
991        P: ValueProjectionExpr,
992    {
993        let target_slot = self.resolve_non_paged_slot(projection.field())?;
994
995        self.with_non_paged(|session, query| {
996            session.execute_fluent_project_terminal_value_by_slot(
997                query,
998                target_slot,
999                AggregateKind::First,
1000                projection.projection_plan().into_expr(),
1001            )
1002        })
1003        .map(|value| value.map(output))
1004    }
1005
1006    /// Explain `first_value_by(field)` routing without executing the terminal.
1007    pub fn explain_first_value_by(
1008        &self,
1009        field: impl AsRef<str>,
1010    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1011    where
1012        E: EntityValue,
1013    {
1014        let target_slot = self.resolve_non_paged_slot(field)?;
1015
1016        self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
1017    }
1018
1019    /// Execute and return the last projected field value in effective response
1020    /// order, if any.
1021    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1022    where
1023        E: EntityValue,
1024    {
1025        let target_slot = self.resolve_non_paged_slot(field)?;
1026
1027        self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
1028            .map(|value| value.map(output))
1029    }
1030
1031    /// Execute and return the last projected value for one shared bounded
1032    /// projection in effective response order, if any.
1033    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1034    where
1035        E: EntityValue,
1036        P: ValueProjectionExpr,
1037    {
1038        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1039
1040        self.with_non_paged(|session, query| {
1041            session.execute_fluent_project_terminal_value_by_slot(
1042                query,
1043                target_slot,
1044                AggregateKind::Last,
1045                projection.projection_plan().into_expr(),
1046            )
1047        })
1048        .map(|value| value.map(output))
1049    }
1050
1051    /// Explain `last_value_by(field)` routing without executing the terminal.
1052    pub fn explain_last_value_by(
1053        &self,
1054        field: impl AsRef<str>,
1055    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1056    where
1057        E: EntityValue,
1058    {
1059        let target_slot = self.resolve_non_paged_slot(field)?;
1060
1061        self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1062    }
1063
1064    /// Execute and return the first matching identifier in response order, if any.
1065    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1066    where
1067        E: EntityValue,
1068    {
1069        self.execute_terminal(FirstIdTerminal::new())
1070    }
1071
1072    /// Explain scalar `first()` routing without executing the terminal.
1073    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1074    where
1075        E: EntityValue,
1076    {
1077        self.explain_terminal(&FirstIdTerminal::new())
1078    }
1079
1080    /// Execute and return the last matching identifier in response order, if any.
1081    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1082    where
1083        E: EntityValue,
1084    {
1085        self.execute_terminal(LastIdTerminal::new())
1086    }
1087
1088    /// Explain scalar `last()` routing without executing the terminal.
1089    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1090    where
1091        E: EntityValue,
1092    {
1093        self.explain_terminal(&LastIdTerminal::new())
1094    }
1095
1096    /// Execute and require exactly one matching row.
1097    pub fn require_one(&self) -> Result<(), QueryError>
1098    where
1099        E: EntityValue,
1100    {
1101        self.execute_rows()?.require_one()?;
1102        Ok(())
1103    }
1104
1105    /// Execute and require at least one matching row.
1106    pub fn require_some(&self) -> Result<(), QueryError>
1107    where
1108        E: EntityValue,
1109    {
1110        self.execute_rows()?.require_some()?;
1111        Ok(())
1112    }
1113}