Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5//!
6//! Terminal Execution Model
7//!
8//! Fluent terminals are concrete descriptors with a 1:1 mapping to session
9//! execution and explain entrypoints. This module may orchestrate descriptor
10//! construction, non-paged gating, and public output shaping, but it must not
11//! carry terminal-kind enums, transport output enums, or match-based execution
12//! dispatch. Adding a new terminal means adding a new descriptor type and one
13//! direct `TerminalStrategyDriver` implementation for that descriptor.
14
15use crate::{
16    db::{
17        DbSession, PersistedRow, Query,
18        query::{
19            api::ResponseCardinalityExt,
20            builder::{
21                AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
22                CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal,
23                FirstIdTerminal, FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal,
24                MaxIdBySlotTerminal, MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal,
25                MinIdTerminal, MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
26                SumDistinctBySlotTerminal, ValueProjectionExpr, ValuesBySlotTerminal,
27                ValuesBySlotWithIdsTerminal,
28            },
29            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
30            fluent::load::{FluentLoadQuery, LoadQueryResult},
31            intent::QueryError,
32            plan::AggregateKind,
33        },
34        response::EntityResponse,
35    },
36    traits::EntityValue,
37    types::{Decimal, Id},
38    value::{OutputValue, Value},
39};
40
41type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
42
43///
44/// TerminalStrategyDriver
45///
46/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
47/// query-owned strategy object and the session-owned execution/explain
48/// boundary. Implementations are deliberately thin: they only choose the
49/// matching `DbSession` method for an existing strategy type.
50///
51
52trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
53    type Output;
54    type ExplainOutput;
55
56    fn execute(
57        self,
58        session: &DbSession<E::Canister>,
59        query: &Query<E>,
60    ) -> Result<Self::Output, QueryError>;
61
62    fn explain(
63        &self,
64        session: &DbSession<E::Canister>,
65        query: &Query<E>,
66    ) -> Result<Self::ExplainOutput, QueryError>;
67}
68
69// Define one aggregate-style terminal driver implementation. The macro keeps
70// terminal descriptors 1:1 with session methods while removing repeated explain
71// wiring that is identical for every aggregate terminal.
72macro_rules! impl_aggregate_terminal_driver {
73    ($terminal:ty, $output:ty, $execute:ident) => {
74        impl<E> TerminalStrategyDriver<E> for $terminal
75        where
76            E: PersistedRow + EntityValue,
77        {
78            type Output = $output;
79            type ExplainOutput = ExplainAggregateTerminalPlan;
80
81            fn execute(
82                self,
83                session: &DbSession<E::Canister>,
84                query: &Query<E>,
85            ) -> Result<Self::Output, QueryError> {
86                session.$execute(query, self)
87            }
88
89            fn explain(
90                &self,
91                session: &DbSession<E::Canister>,
92                query: &Query<E>,
93            ) -> Result<Self::ExplainOutput, QueryError> {
94                session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
95            }
96        }
97    };
98}
99
100// Define one projection-style terminal driver implementation. Projection
101// terminals share the execution/explain shape but return execution descriptors
102// rather than aggregate-terminal plans.
103macro_rules! impl_projection_terminal_driver {
104    ($terminal:ty, $output:ty, $execute:ident) => {
105        impl<E> TerminalStrategyDriver<E> for $terminal
106        where
107            E: PersistedRow + EntityValue,
108        {
109            type Output = $output;
110            type ExplainOutput = ExplainExecutionNodeDescriptor;
111
112            fn execute(
113                self,
114                session: &DbSession<E::Canister>,
115                query: &Query<E>,
116            ) -> Result<Self::Output, QueryError> {
117                session.$execute(query, self)
118            }
119
120            fn explain(
121                &self,
122                session: &DbSession<E::Canister>,
123                query: &Query<E>,
124            ) -> Result<Self::ExplainOutput, QueryError> {
125                session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
126            }
127        }
128    };
129}
130
131impl_aggregate_terminal_driver!(CountRowsTerminal, u32, execute_fluent_count_rows_terminal);
132impl_aggregate_terminal_driver!(
133    ExistsRowsTerminal,
134    bool,
135    execute_fluent_exists_rows_terminal
136);
137impl_aggregate_terminal_driver!(MinIdTerminal, Option<Id<E>>, execute_fluent_min_id_terminal);
138impl_aggregate_terminal_driver!(MaxIdTerminal, Option<Id<E>>, execute_fluent_max_id_terminal);
139impl_aggregate_terminal_driver!(
140    MinIdBySlotTerminal,
141    Option<Id<E>>,
142    execute_fluent_min_id_by_slot
143);
144impl_aggregate_terminal_driver!(
145    MaxIdBySlotTerminal,
146    Option<Id<E>>,
147    execute_fluent_max_id_by_slot
148);
149impl_aggregate_terminal_driver!(
150    SumBySlotTerminal,
151    Option<Decimal>,
152    execute_fluent_sum_by_slot
153);
154impl_aggregate_terminal_driver!(
155    SumDistinctBySlotTerminal,
156    Option<Decimal>,
157    execute_fluent_sum_distinct_by_slot
158);
159impl_aggregate_terminal_driver!(
160    AvgBySlotTerminal,
161    Option<Decimal>,
162    execute_fluent_avg_by_slot
163);
164impl_aggregate_terminal_driver!(
165    AvgDistinctBySlotTerminal,
166    Option<Decimal>,
167    execute_fluent_avg_distinct_by_slot
168);
169impl_aggregate_terminal_driver!(
170    FirstIdTerminal,
171    Option<Id<E>>,
172    execute_fluent_first_id_terminal
173);
174impl_aggregate_terminal_driver!(
175    LastIdTerminal,
176    Option<Id<E>>,
177    execute_fluent_last_id_terminal
178);
179impl_aggregate_terminal_driver!(
180    NthIdBySlotTerminal,
181    Option<Id<E>>,
182    execute_fluent_nth_id_by_slot
183);
184impl_aggregate_terminal_driver!(
185    MedianIdBySlotTerminal,
186    Option<Id<E>>,
187    execute_fluent_median_id_by_slot
188);
189impl_aggregate_terminal_driver!(
190    MinMaxIdBySlotTerminal,
191    MinMaxByIds<E>,
192    execute_fluent_min_max_id_by_slot
193);
194
195impl_projection_terminal_driver!(
196    ValuesBySlotTerminal,
197    Vec<Value>,
198    execute_fluent_values_by_slot
199);
200impl_projection_terminal_driver!(
201    DistinctValuesBySlotTerminal,
202    Vec<Value>,
203    execute_fluent_distinct_values_by_slot
204);
205impl_projection_terminal_driver!(
206    CountDistinctBySlotTerminal,
207    u32,
208    execute_fluent_count_distinct_by_slot
209);
210impl_projection_terminal_driver!(
211    ValuesBySlotWithIdsTerminal,
212    Vec<(Id<E>, Value)>,
213    execute_fluent_values_by_with_ids_slot
214);
215impl_projection_terminal_driver!(
216    FirstValueBySlotTerminal,
217    Option<Value>,
218    execute_fluent_first_value_by_slot
219);
220impl_projection_terminal_driver!(
221    LastValueBySlotTerminal,
222    Option<Value>,
223    execute_fluent_last_value_by_slot
224);
225
226// Convert one runtime projection value into the public output boundary type.
227fn output(value: Value) -> OutputValue {
228    OutputValue::from(value)
229}
230
231// Convert one ordered runtime projection vector into the public output form.
232fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
233    values.into_iter().map(output).collect()
234}
235
236// Convert one ordered runtime `(id, value)` projection vector into the public output form.
237fn output_values_with_ids<E: PersistedRow>(
238    values: Vec<(Id<E>, Value)>,
239) -> Vec<(Id<E>, OutputValue)> {
240    values
241        .into_iter()
242        .map(|(id, value)| (id, output(value)))
243        .collect()
244}
245
246impl<E> FluentLoadQuery<'_, E>
247where
248    E: PersistedRow,
249{
250    // ------------------------------------------------------------------
251    // Execution (single semantic boundary)
252    // ------------------------------------------------------------------
253
254    /// Execute this query using the session's policy settings.
255    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
256    where
257        E: EntityValue,
258    {
259        self.with_non_paged(DbSession::execute_query_result)
260    }
261
262    // Run one terminal operation through the canonical non-paged fluent policy
263    // gate so execution and explain helpers cannot drift on readiness checks.
264    fn with_non_paged<T>(
265        &self,
266        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
267    ) -> Result<T, QueryError>
268    where
269        E: EntityValue,
270    {
271        self.ensure_non_paged_mode_ready()?;
272        map(self.session, self.query())
273    }
274
275    // Resolve the structural execution descriptor for this fluent load query
276    // through the session-owned visible-index explain path once.
277    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
282    }
283
284    // Render one descriptor-derived execution surface so text/json explain
285    // terminals do not each forward the same session explain call ad hoc.
286    fn render_execution_descriptor(
287        &self,
288        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
289    ) -> Result<String, QueryError>
290    where
291        E: EntityValue,
292    {
293        let descriptor = self.explain_execution_descriptor()?;
294
295        Ok(render(descriptor))
296    }
297
298    // Execute one prepared terminal descriptor through the canonical
299    // non-paged fluent policy gate.
300    fn execute_terminal<S>(&self, strategy: S) -> Result<S::Output, QueryError>
301    where
302        E: EntityValue,
303        S: TerminalStrategyDriver<E>,
304    {
305        self.with_non_paged(|session, query| strategy.execute(session, query))
306    }
307
308    // Explain one prepared terminal strategy through the same non-paged fluent
309    // policy gate used by execution.
310    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
311    where
312        E: EntityValue,
313        S: TerminalStrategyDriver<E>,
314    {
315        self.with_non_paged(|session, query| strategy.explain(session, query))
316    }
317
318    // ------------------------------------------------------------------
319    // Execution terminals — semantic only
320    // ------------------------------------------------------------------
321
322    /// Execute and return whether the result set is empty.
323    pub fn is_empty(&self) -> Result<bool, QueryError>
324    where
325        E: EntityValue,
326    {
327        self.not_exists()
328    }
329
330    /// Execute and return whether no matching row exists.
331    pub fn not_exists(&self) -> Result<bool, QueryError>
332    where
333        E: EntityValue,
334    {
335        Ok(!self.exists()?)
336    }
337
338    /// Execute and return whether at least one matching row exists.
339    pub fn exists(&self) -> Result<bool, QueryError>
340    where
341        E: EntityValue,
342    {
343        self.execute_terminal(ExistsRowsTerminal::new())
344    }
345
346    /// Explain scalar `exists()` routing without executing the terminal.
347    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
348    where
349        E: EntityValue,
350    {
351        self.explain_terminal(&ExistsRowsTerminal::new())
352    }
353
354    /// Explain scalar `not_exists()` routing without executing the terminal.
355    ///
356    /// This remains an `exists()` execution plan with negated boolean semantics.
357    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
358    where
359        E: EntityValue,
360    {
361        self.explain_exists()
362    }
363
364    /// Explain scalar load execution shape without executing the query.
365    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
366    where
367        E: EntityValue,
368    {
369        self.explain_execution_descriptor()
370    }
371
372    /// Explain scalar load execution shape as deterministic text.
373    pub fn explain_execution_text(&self) -> Result<String, QueryError>
374    where
375        E: EntityValue,
376    {
377        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
378    }
379
380    /// Explain scalar load execution shape as canonical JSON.
381    pub fn explain_execution_json(&self) -> Result<String, QueryError>
382    where
383        E: EntityValue,
384    {
385        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
386    }
387
388    /// Explain scalar load execution shape as verbose text with diagnostics.
389    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
390    where
391        E: EntityValue,
392    {
393        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
394    }
395
396    /// Execute and return the number of matching rows.
397    pub fn count(&self) -> Result<u32, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.execute_terminal(CountRowsTerminal::new())
402    }
403
404    /// Execute and return the total persisted payload bytes for the effective
405    /// result window.
406    pub fn bytes(&self) -> Result<u64, QueryError>
407    where
408        E: EntityValue,
409    {
410        self.with_non_paged(DbSession::execute_fluent_bytes)
411    }
412
413    /// Execute and return the total serialized bytes for `field` over the
414    /// effective result window.
415    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
416    where
417        E: EntityValue,
418    {
419        let target_slot = self.resolve_non_paged_slot(field)?;
420
421        self.with_non_paged(|session, query| {
422            session.execute_fluent_bytes_by_slot(query, target_slot)
423        })
424    }
425
426    /// Explain `bytes_by(field)` routing without executing the terminal.
427    pub fn explain_bytes_by(
428        &self,
429        field: impl AsRef<str>,
430    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
431    where
432        E: EntityValue,
433    {
434        let target_slot = self.resolve_non_paged_slot(field)?;
435
436        self.with_non_paged(|session, query| {
437            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
438        })
439    }
440
441    /// Execute and return the smallest matching identifier, if any.
442    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
443    where
444        E: EntityValue,
445    {
446        self.execute_terminal(MinIdTerminal::new())
447    }
448
449    /// Explain scalar `min()` routing without executing the terminal.
450    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
451    where
452        E: EntityValue,
453    {
454        self.explain_terminal(&MinIdTerminal::new())
455    }
456
457    /// Execute and return the id of the row with the smallest value for `field`.
458    ///
459    /// Ties are deterministic: equal field values resolve by primary key ascending.
460    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
461    where
462        E: EntityValue,
463    {
464        let target_slot = self.resolve_non_paged_slot(field)?;
465
466        self.execute_terminal(MinIdBySlotTerminal::new(target_slot))
467    }
468
469    /// Execute and return the largest matching identifier, if any.
470    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
471    where
472        E: EntityValue,
473    {
474        self.execute_terminal(MaxIdTerminal::new())
475    }
476
477    /// Explain scalar `max()` routing without executing the terminal.
478    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
479    where
480        E: EntityValue,
481    {
482        self.explain_terminal(&MaxIdTerminal::new())
483    }
484
485    /// Execute and return the id of the row with the largest value for `field`.
486    ///
487    /// Ties are deterministic: equal field values resolve by primary key ascending.
488    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
489    where
490        E: EntityValue,
491    {
492        let target_slot = self.resolve_non_paged_slot(field)?;
493
494        self.execute_terminal(MaxIdBySlotTerminal::new(target_slot))
495    }
496
497    /// Execute and return the id at zero-based ordinal `nth` when rows are
498    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
499    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
500    where
501        E: EntityValue,
502    {
503        let target_slot = self.resolve_non_paged_slot(field)?;
504
505        self.execute_terminal(NthIdBySlotTerminal::new(target_slot, nth))
506    }
507
508    /// Execute and return the sum of `field` over matching rows.
509    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
510    where
511        E: EntityValue,
512    {
513        let target_slot = self.resolve_non_paged_slot(field)?;
514
515        self.execute_terminal(SumBySlotTerminal::new(target_slot))
516    }
517
518    /// Explain scalar `sum_by(field)` routing without executing the terminal.
519    pub fn explain_sum_by(
520        &self,
521        field: impl AsRef<str>,
522    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
523    where
524        E: EntityValue,
525    {
526        let target_slot = self.resolve_non_paged_slot(field)?;
527
528        self.explain_terminal(&SumBySlotTerminal::new(target_slot))
529    }
530
531    /// Execute and return the sum of distinct `field` values.
532    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
533    where
534        E: EntityValue,
535    {
536        let target_slot = self.resolve_non_paged_slot(field)?;
537
538        self.execute_terminal(SumDistinctBySlotTerminal::new(target_slot))
539    }
540
541    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
542    pub fn explain_sum_distinct_by(
543        &self,
544        field: impl AsRef<str>,
545    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
546    where
547        E: EntityValue,
548    {
549        let target_slot = self.resolve_non_paged_slot(field)?;
550
551        self.explain_terminal(&SumDistinctBySlotTerminal::new(target_slot))
552    }
553
554    /// Execute and return the average of `field` over matching rows.
555    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
556    where
557        E: EntityValue,
558    {
559        let target_slot = self.resolve_non_paged_slot(field)?;
560
561        self.execute_terminal(AvgBySlotTerminal::new(target_slot))
562    }
563
564    /// Explain scalar `avg_by(field)` routing without executing the terminal.
565    pub fn explain_avg_by(
566        &self,
567        field: impl AsRef<str>,
568    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
569    where
570        E: EntityValue,
571    {
572        let target_slot = self.resolve_non_paged_slot(field)?;
573
574        self.explain_terminal(&AvgBySlotTerminal::new(target_slot))
575    }
576
577    /// Execute and return the average of distinct `field` values.
578    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
579    where
580        E: EntityValue,
581    {
582        let target_slot = self.resolve_non_paged_slot(field)?;
583
584        self.execute_terminal(AvgDistinctBySlotTerminal::new(target_slot))
585    }
586
587    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
588    pub fn explain_avg_distinct_by(
589        &self,
590        field: impl AsRef<str>,
591    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
592    where
593        E: EntityValue,
594    {
595        let target_slot = self.resolve_non_paged_slot(field)?;
596
597        self.explain_terminal(&AvgDistinctBySlotTerminal::new(target_slot))
598    }
599
600    /// Execute and return the median id by `field` using deterministic ordering
601    /// `(field asc, primary key asc)`.
602    ///
603    /// Even-length windows select the lower median.
604    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
605    where
606        E: EntityValue,
607    {
608        let target_slot = self.resolve_non_paged_slot(field)?;
609
610        self.execute_terminal(MedianIdBySlotTerminal::new(target_slot))
611    }
612
613    /// Execute and return the number of distinct values for `field` over the
614    /// effective result window.
615    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
616    where
617        E: EntityValue,
618    {
619        let target_slot = self.resolve_non_paged_slot(field)?;
620
621        self.execute_terminal(CountDistinctBySlotTerminal::new(target_slot))
622    }
623
624    /// Explain `count_distinct_by(field)` routing without executing the terminal.
625    pub fn explain_count_distinct_by(
626        &self,
627        field: impl AsRef<str>,
628    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
629    where
630        E: EntityValue,
631    {
632        let target_slot = self.resolve_non_paged_slot(field)?;
633
634        self.explain_terminal(&CountDistinctBySlotTerminal::new(target_slot))
635    }
636
637    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
638    ///
639    /// Tie handling is deterministic for both extrema: primary key ascending.
640    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
641    where
642        E: EntityValue,
643    {
644        let target_slot = self.resolve_non_paged_slot(field)?;
645
646        self.execute_terminal(MinMaxIdBySlotTerminal::new(target_slot))
647    }
648
649    /// Execute and return projected field values for the effective result window.
650    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
651    where
652        E: EntityValue,
653    {
654        let target_slot = self.resolve_non_paged_slot(field)?;
655
656        self.execute_terminal(ValuesBySlotTerminal::new(target_slot))
657            .map(output_values)
658    }
659
660    /// Execute and return projected values for one shared bounded projection
661    /// over the effective response window.
662    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
663    where
664        E: EntityValue,
665        P: ValueProjectionExpr,
666    {
667        let target_slot = self.resolve_non_paged_slot(projection.field())?;
668
669        self.with_non_paged(|session, query| {
670            session.execute_fluent_project_values_by_slot(
671                query,
672                target_slot,
673                projection.projection_plan().into_expr(),
674            )
675        })
676        .map(output_values)
677    }
678
679    /// Explain `project_values(projection)` routing without executing it.
680    pub fn explain_project_values<P>(
681        &self,
682        projection: &P,
683    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
684    where
685        E: EntityValue,
686        P: ValueProjectionExpr,
687    {
688        let target_slot = self.resolve_non_paged_slot(projection.field())?;
689
690        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
691    }
692
693    /// Explain `values_by(field)` routing without executing the terminal.
694    pub fn explain_values_by(
695        &self,
696        field: impl AsRef<str>,
697    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
698    where
699        E: EntityValue,
700    {
701        let target_slot = self.resolve_non_paged_slot(field)?;
702
703        self.explain_terminal(&ValuesBySlotTerminal::new(target_slot))
704    }
705
706    /// Execute and return the first `k` rows from the effective response window.
707    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
708    where
709        E: EntityValue,
710    {
711        self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
712    }
713
714    /// Execute and return the top `k` rows by `field` under deterministic
715    /// ordering `(field desc, primary_key asc)` over the effective response
716    /// window.
717    ///
718    /// This terminal applies its own ordering and does not preserve query
719    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
720    /// matches `max_by(field)` selection semantics.
721    pub fn top_k_by(
722        &self,
723        field: impl AsRef<str>,
724        take_count: u32,
725    ) -> Result<EntityResponse<E>, QueryError>
726    where
727        E: EntityValue,
728    {
729        let target_slot = self.resolve_non_paged_slot(field)?;
730
731        self.with_non_paged(|session, query| {
732            session.execute_fluent_top_k_rows_by_slot(query, target_slot, take_count)
733        })
734    }
735
736    /// Execute and return the bottom `k` rows by `field` under deterministic
737    /// ordering `(field asc, primary_key asc)` over the effective response
738    /// window.
739    ///
740    /// This terminal applies its own ordering and does not preserve query
741    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
742    /// matches `min_by(field)` selection semantics.
743    pub fn bottom_k_by(
744        &self,
745        field: impl AsRef<str>,
746        take_count: u32,
747    ) -> Result<EntityResponse<E>, QueryError>
748    where
749        E: EntityValue,
750    {
751        let target_slot = self.resolve_non_paged_slot(field)?;
752
753        self.with_non_paged(|session, query| {
754            session.execute_fluent_bottom_k_rows_by_slot(query, target_slot, take_count)
755        })
756    }
757
758    /// Execute and return projected values for the top `k` rows by `field`
759    /// under deterministic ordering `(field desc, primary_key asc)` over the
760    /// effective response window.
761    ///
762    /// Ranking is applied before projection and does not preserve query
763    /// `order_term(...)` row order in the returned values. For `k = 1`, this
764    /// matches `max_by(field)` projected to one value.
765    pub fn top_k_by_values(
766        &self,
767        field: impl AsRef<str>,
768        take_count: u32,
769    ) -> Result<Vec<OutputValue>, QueryError>
770    where
771        E: EntityValue,
772    {
773        let target_slot = self.resolve_non_paged_slot(field)?;
774
775        self.with_non_paged(|session, query| {
776            session
777                .execute_fluent_top_k_values_by_slot(query, target_slot, take_count)
778                .map(output_values)
779        })
780    }
781
782    /// Execute and return projected values for the bottom `k` rows by `field`
783    /// under deterministic ordering `(field asc, primary_key asc)` over the
784    /// effective response window.
785    ///
786    /// Ranking is applied before projection and does not preserve query
787    /// `order_term(...)` row order in the returned values. For `k = 1`, this
788    /// matches `min_by(field)` projected to one value.
789    pub fn bottom_k_by_values(
790        &self,
791        field: impl AsRef<str>,
792        take_count: u32,
793    ) -> Result<Vec<OutputValue>, QueryError>
794    where
795        E: EntityValue,
796    {
797        let target_slot = self.resolve_non_paged_slot(field)?;
798
799        self.with_non_paged(|session, query| {
800            session
801                .execute_fluent_bottom_k_values_by_slot(query, target_slot, take_count)
802                .map(output_values)
803        })
804    }
805
806    /// Execute and return projected id/value pairs for the top `k` rows by
807    /// `field` under deterministic ordering `(field desc, primary_key asc)`
808    /// over the effective response window.
809    ///
810    /// Ranking is applied before projection and does not preserve query
811    /// `order_term(...)` row order in the returned values. For `k = 1`, this
812    /// matches `max_by(field)` projected to one `(id, value)` pair.
813    pub fn top_k_by_with_ids(
814        &self,
815        field: impl AsRef<str>,
816        take_count: u32,
817    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
818    where
819        E: EntityValue,
820    {
821        let target_slot = self.resolve_non_paged_slot(field)?;
822
823        self.with_non_paged(|session, query| {
824            session
825                .execute_fluent_top_k_values_with_ids_by_slot(query, target_slot, take_count)
826                .map(output_values_with_ids)
827        })
828    }
829
830    /// Execute and return projected id/value pairs for the bottom `k` rows by
831    /// `field` under deterministic ordering `(field asc, primary_key asc)`
832    /// over the effective response window.
833    ///
834    /// Ranking is applied before projection and does not preserve query
835    /// `order_term(...)` row order in the returned values. For `k = 1`, this
836    /// matches `min_by(field)` projected to one `(id, value)` pair.
837    pub fn bottom_k_by_with_ids(
838        &self,
839        field: impl AsRef<str>,
840        take_count: u32,
841    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
842    where
843        E: EntityValue,
844    {
845        let target_slot = self.resolve_non_paged_slot(field)?;
846
847        self.with_non_paged(|session, query| {
848            session
849                .execute_fluent_bottom_k_values_with_ids_by_slot(query, target_slot, take_count)
850                .map(output_values_with_ids)
851        })
852    }
853
854    /// Execute and return distinct projected field values for the effective
855    /// result window, preserving first-observed value order.
856    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
857    where
858        E: EntityValue,
859    {
860        let target_slot = self.resolve_non_paged_slot(field)?;
861
862        self.execute_terminal(DistinctValuesBySlotTerminal::new(target_slot))
863            .map(output_values)
864    }
865
866    /// Explain `distinct_values_by(field)` routing without executing the terminal.
867    pub fn explain_distinct_values_by(
868        &self,
869        field: impl AsRef<str>,
870    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
871    where
872        E: EntityValue,
873    {
874        let target_slot = self.resolve_non_paged_slot(field)?;
875
876        self.explain_terminal(&DistinctValuesBySlotTerminal::new(target_slot))
877    }
878
879    /// Execute and return projected field values paired with row ids for the
880    /// effective result window.
881    pub fn values_by_with_ids(
882        &self,
883        field: impl AsRef<str>,
884    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
885    where
886        E: EntityValue,
887    {
888        let target_slot = self.resolve_non_paged_slot(field)?;
889
890        self.execute_terminal(ValuesBySlotWithIdsTerminal::new(target_slot))
891            .map(output_values_with_ids)
892    }
893
894    /// Execute and return projected id/value pairs for one shared bounded
895    /// projection over the effective response window.
896    pub fn project_values_with_ids<P>(
897        &self,
898        projection: &P,
899    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
900    where
901        E: EntityValue,
902        P: ValueProjectionExpr,
903    {
904        let target_slot = self.resolve_non_paged_slot(projection.field())?;
905
906        self.with_non_paged(|session, query| {
907            session.execute_fluent_project_values_with_ids_by_slot(
908                query,
909                target_slot,
910                projection.projection_plan().into_expr(),
911            )
912        })
913        .map(output_values_with_ids)
914    }
915
916    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
917    pub fn explain_values_by_with_ids(
918        &self,
919        field: impl AsRef<str>,
920    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
921    where
922        E: EntityValue,
923    {
924        let target_slot = self.resolve_non_paged_slot(field)?;
925
926        self.explain_terminal(&ValuesBySlotWithIdsTerminal::new(target_slot))
927    }
928
929    /// Execute and return the first projected field value in effective response
930    /// order, if any.
931    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
932    where
933        E: EntityValue,
934    {
935        let target_slot = self.resolve_non_paged_slot(field)?;
936
937        self.execute_terminal(FirstValueBySlotTerminal::new(target_slot))
938            .map(|value| value.map(output))
939    }
940
941    /// Execute and return the first projected value for one shared bounded
942    /// projection in effective response order, if any.
943    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
944    where
945        E: EntityValue,
946        P: ValueProjectionExpr,
947    {
948        let target_slot = self.resolve_non_paged_slot(projection.field())?;
949
950        self.with_non_paged(|session, query| {
951            session.execute_fluent_project_terminal_value_by_slot(
952                query,
953                target_slot,
954                AggregateKind::First,
955                projection.projection_plan().into_expr(),
956            )
957        })
958        .map(|value| value.map(output))
959    }
960
961    /// Explain `first_value_by(field)` routing without executing the terminal.
962    pub fn explain_first_value_by(
963        &self,
964        field: impl AsRef<str>,
965    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
966    where
967        E: EntityValue,
968    {
969        let target_slot = self.resolve_non_paged_slot(field)?;
970
971        self.explain_terminal(&FirstValueBySlotTerminal::new(target_slot))
972    }
973
974    /// Execute and return the last projected field value in effective response
975    /// order, if any.
976    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
977    where
978        E: EntityValue,
979    {
980        let target_slot = self.resolve_non_paged_slot(field)?;
981
982        self.execute_terminal(LastValueBySlotTerminal::new(target_slot))
983            .map(|value| value.map(output))
984    }
985
986    /// Execute and return the last projected value for one shared bounded
987    /// projection in effective response order, if any.
988    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
989    where
990        E: EntityValue,
991        P: ValueProjectionExpr,
992    {
993        let target_slot = self.resolve_non_paged_slot(projection.field())?;
994
995        self.with_non_paged(|session, query| {
996            session.execute_fluent_project_terminal_value_by_slot(
997                query,
998                target_slot,
999                AggregateKind::Last,
1000                projection.projection_plan().into_expr(),
1001            )
1002        })
1003        .map(|value| value.map(output))
1004    }
1005
1006    /// Explain `last_value_by(field)` routing without executing the terminal.
1007    pub fn explain_last_value_by(
1008        &self,
1009        field: impl AsRef<str>,
1010    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1011    where
1012        E: EntityValue,
1013    {
1014        let target_slot = self.resolve_non_paged_slot(field)?;
1015
1016        self.explain_terminal(&LastValueBySlotTerminal::new(target_slot))
1017    }
1018
1019    /// Execute and return the first matching identifier in response order, if any.
1020    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1021    where
1022        E: EntityValue,
1023    {
1024        self.execute_terminal(FirstIdTerminal::new())
1025    }
1026
1027    /// Explain scalar `first()` routing without executing the terminal.
1028    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1029    where
1030        E: EntityValue,
1031    {
1032        self.explain_terminal(&FirstIdTerminal::new())
1033    }
1034
1035    /// Execute and return the last matching identifier in response order, if any.
1036    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1037    where
1038        E: EntityValue,
1039    {
1040        self.execute_terminal(LastIdTerminal::new())
1041    }
1042
1043    /// Explain scalar `last()` routing without executing the terminal.
1044    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1045    where
1046        E: EntityValue,
1047    {
1048        self.explain_terminal(&LastIdTerminal::new())
1049    }
1050
1051    /// Execute and require exactly one matching row.
1052    pub fn require_one(&self) -> Result<(), QueryError>
1053    where
1054        E: EntityValue,
1055    {
1056        self.execute()?.into_rows()?.require_one()?;
1057        Ok(())
1058    }
1059
1060    /// Execute and require at least one matching row.
1061    pub fn require_some(&self) -> Result<(), QueryError>
1062    where
1063        E: EntityValue,
1064    {
1065        self.execute()?.into_rows()?.require_some()?;
1066        Ok(())
1067    }
1068}