Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        query::{
10            api::ResponseCardinalityExt,
11            builder::{
12                ExistingRowsTerminalStrategy, NumericFieldStrategy, OrderSensitiveTerminalStrategy,
13                ProjectionStrategy, ScalarTerminalStrategy, ValueProjectionExpr,
14            },
15            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
16            fluent::load::{
17                FluentLoadQuery, FluentProjectionTerminalOutput, FluentScalarTerminalOutput,
18                LoadQueryResult,
19            },
20            intent::QueryError,
21            plan::AggregateKind,
22        },
23        response::EntityResponse,
24    },
25    error::InternalError,
26    traits::EntityValue,
27    types::{Decimal, Id},
28    value::{OutputValue, Value},
29};
30
31type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
32
33///
34/// TerminalStrategyDriver
35///
36/// TerminalStrategyDriver is the fluent terminal wiring adapter between a
37/// query-owned strategy object and the session-owned execution/explain
38/// boundary. Implementations are deliberately thin: they only choose the
39/// matching `DbSession` method for an existing strategy type.
40///
41
42trait TerminalStrategyDriver<E: PersistedRow + EntityValue> {
43    type Output;
44    type ExplainOutput;
45
46    fn execute(
47        &self,
48        session: &DbSession<E::Canister>,
49        query: &Query<E>,
50    ) -> Result<Self::Output, QueryError>;
51
52    fn explain(
53        &self,
54        session: &DbSession<E::Canister>,
55        query: &Query<E>,
56    ) -> Result<Self::ExplainOutput, QueryError>;
57}
58
59impl<E> TerminalStrategyDriver<E> for ExistingRowsTerminalStrategy
60where
61    E: PersistedRow + EntityValue,
62{
63    type Output = FluentScalarTerminalOutput<E>;
64    type ExplainOutput = ExplainAggregateTerminalPlan;
65
66    fn execute(
67        &self,
68        session: &DbSession<E::Canister>,
69        query: &Query<E>,
70    ) -> Result<Self::Output, QueryError> {
71        session.execute_fluent_existing_rows_terminal(query, self.clone())
72    }
73
74    fn explain(
75        &self,
76        session: &DbSession<E::Canister>,
77        query: &Query<E>,
78    ) -> Result<Self::ExplainOutput, QueryError> {
79        session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
80    }
81}
82
83impl<E> TerminalStrategyDriver<E> for ScalarTerminalStrategy
84where
85    E: PersistedRow + EntityValue,
86{
87    type Output = FluentScalarTerminalOutput<E>;
88    type ExplainOutput = ExplainAggregateTerminalPlan;
89
90    fn execute(
91        &self,
92        session: &DbSession<E::Canister>,
93        query: &Query<E>,
94    ) -> Result<Self::Output, QueryError> {
95        session.execute_fluent_scalar_terminal(query, self.clone())
96    }
97
98    fn explain(
99        &self,
100        session: &DbSession<E::Canister>,
101        query: &Query<E>,
102    ) -> Result<Self::ExplainOutput, QueryError> {
103        session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
104    }
105}
106
107impl<E> TerminalStrategyDriver<E> for NumericFieldStrategy
108where
109    E: PersistedRow + EntityValue,
110{
111    type Output = Option<Decimal>;
112    type ExplainOutput = ExplainAggregateTerminalPlan;
113
114    fn execute(
115        &self,
116        session: &DbSession<E::Canister>,
117        query: &Query<E>,
118    ) -> Result<Self::Output, QueryError> {
119        session.execute_fluent_numeric_field_terminal(query, self.clone())
120    }
121
122    fn explain(
123        &self,
124        session: &DbSession<E::Canister>,
125        query: &Query<E>,
126    ) -> Result<Self::ExplainOutput, QueryError> {
127        session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
128    }
129}
130
131impl<E> TerminalStrategyDriver<E> for OrderSensitiveTerminalStrategy
132where
133    E: PersistedRow + EntityValue,
134{
135    type Output = FluentScalarTerminalOutput<E>;
136    type ExplainOutput = ExplainAggregateTerminalPlan;
137
138    fn execute(
139        &self,
140        session: &DbSession<E::Canister>,
141        query: &Query<E>,
142    ) -> Result<Self::Output, QueryError> {
143        session.execute_fluent_order_sensitive_terminal(query, self.clone())
144    }
145
146    fn explain(
147        &self,
148        session: &DbSession<E::Canister>,
149        query: &Query<E>,
150    ) -> Result<Self::ExplainOutput, QueryError> {
151        session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, self)
152    }
153}
154
155impl<E> TerminalStrategyDriver<E> for ProjectionStrategy
156where
157    E: PersistedRow + EntityValue,
158{
159    type Output = FluentProjectionTerminalOutput<E>;
160    type ExplainOutput = ExplainExecutionNodeDescriptor;
161
162    fn execute(
163        &self,
164        session: &DbSession<E::Canister>,
165        query: &Query<E>,
166    ) -> Result<Self::Output, QueryError> {
167        session.execute_fluent_projection_terminal(query, self.clone())
168    }
169
170    fn explain(
171        &self,
172        session: &DbSession<E::Canister>,
173        query: &Query<E>,
174    ) -> Result<Self::ExplainOutput, QueryError> {
175        session.explain_query_prepared_projection_terminal_with_visible_indexes(query, self)
176    }
177}
178
179// Convert one runtime projection value into the public output boundary type.
180fn output(value: Value) -> OutputValue {
181    OutputValue::from(value)
182}
183
184// Convert one ordered runtime projection vector into the public output form.
185fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
186    values.into_iter().map(output).collect()
187}
188
189// Convert one ordered runtime `(id, value)` projection vector into the public output form.
190fn output_values_with_ids<E: PersistedRow>(
191    values: Vec<(Id<E>, Value)>,
192) -> Vec<(Id<E>, OutputValue)> {
193    values
194        .into_iter()
195        .map(|(id, value)| (id, output(value)))
196        .collect()
197}
198
199impl<E> FluentLoadQuery<'_, E>
200where
201    E: PersistedRow,
202{
203    // ------------------------------------------------------------------
204    // Execution (single semantic boundary)
205    // ------------------------------------------------------------------
206
207    /// Execute this query using the session's policy settings.
208    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
209    where
210        E: EntityValue,
211    {
212        self.with_non_paged(DbSession::execute_query_result)
213    }
214
215    // Run one terminal operation through the canonical non-paged fluent policy
216    // gate so execution and explain helpers cannot drift on readiness checks.
217    fn with_non_paged<T>(
218        &self,
219        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
220    ) -> Result<T, QueryError>
221    where
222        E: EntityValue,
223    {
224        self.ensure_non_paged_mode_ready()?;
225        map(self.session, self.query())
226    }
227
228    // Resolve the structural execution descriptor for this fluent load query
229    // through the session-owned visible-index explain path once.
230    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
231    where
232        E: EntityValue,
233    {
234        self.with_non_paged(DbSession::explain_query_execution_with_visible_indexes)
235    }
236
237    // Render one descriptor-derived execution surface so text/json explain
238    // terminals do not each forward the same session explain call ad hoc.
239    fn render_execution_descriptor(
240        &self,
241        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
242    ) -> Result<String, QueryError>
243    where
244        E: EntityValue,
245    {
246        let descriptor = self.explain_execution_descriptor()?;
247
248        Ok(render(descriptor))
249    }
250
251    // Execute one prepared terminal strategy and decode its output through the
252    // single fluent mismatch/error-mapping lane.
253    fn execute_terminal<S, T>(
254        &self,
255        strategy: S,
256        map: impl FnOnce(S::Output) -> Result<T, InternalError>,
257    ) -> Result<T, QueryError>
258    where
259        E: EntityValue,
260        S: TerminalStrategyDriver<E>,
261    {
262        self.with_non_paged(|session, query| {
263            let output = strategy.execute(session, query)?;
264
265            map(output).map_err(QueryError::execute)
266        })
267    }
268
269    // Explain one prepared terminal strategy through the same non-paged fluent
270    // policy gate used by execution.
271    fn explain_terminal<S>(&self, strategy: &S) -> Result<S::ExplainOutput, QueryError>
272    where
273        E: EntityValue,
274        S: TerminalStrategyDriver<E>,
275    {
276        self.with_non_paged(|session, query| strategy.explain(session, query))
277    }
278
279    // Apply one shared bounded value projection to iterator-like terminal
280    // output while preserving source order and cardinality.
281    fn project_terminal_items<P, T, U>(
282        projection: &P,
283        values: impl IntoIterator<Item = T>,
284        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
285    ) -> Result<Vec<U>, QueryError>
286    where
287        P: ValueProjectionExpr,
288    {
289        values
290            .into_iter()
291            .map(|value| map(projection, value))
292            .collect()
293    }
294
295    // ------------------------------------------------------------------
296    // Execution terminals — semantic only
297    // ------------------------------------------------------------------
298
299    /// Execute and return whether the result set is empty.
300    pub fn is_empty(&self) -> Result<bool, QueryError>
301    where
302        E: EntityValue,
303    {
304        self.not_exists()
305    }
306
307    /// Execute and return whether no matching row exists.
308    pub fn not_exists(&self) -> Result<bool, QueryError>
309    where
310        E: EntityValue,
311    {
312        Ok(!self.exists()?)
313    }
314
315    /// Execute and return whether at least one matching row exists.
316    pub fn exists(&self) -> Result<bool, QueryError>
317    where
318        E: EntityValue,
319    {
320        self.execute_terminal(
321            ExistingRowsTerminalStrategy::exists_rows(),
322            FluentScalarTerminalOutput::into_exists,
323        )
324    }
325
326    /// Explain scalar `exists()` routing without executing the terminal.
327    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
328    where
329        E: EntityValue,
330    {
331        self.explain_terminal(&ExistingRowsTerminalStrategy::exists_rows())
332    }
333
334    /// Explain scalar `not_exists()` routing without executing the terminal.
335    ///
336    /// This remains an `exists()` execution plan with negated boolean semantics.
337    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
338    where
339        E: EntityValue,
340    {
341        self.explain_exists()
342    }
343
344    /// Explain scalar load execution shape without executing the query.
345    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
346    where
347        E: EntityValue,
348    {
349        self.explain_execution_descriptor()
350    }
351
352    /// Explain scalar load execution shape as deterministic text.
353    pub fn explain_execution_text(&self) -> Result<String, QueryError>
354    where
355        E: EntityValue,
356    {
357        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
358    }
359
360    /// Explain scalar load execution shape as canonical JSON.
361    pub fn explain_execution_json(&self) -> Result<String, QueryError>
362    where
363        E: EntityValue,
364    {
365        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
366    }
367
368    /// Explain scalar load execution shape as verbose text with diagnostics.
369    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
370    where
371        E: EntityValue,
372    {
373        self.with_non_paged(DbSession::explain_query_execution_verbose_with_visible_indexes)
374    }
375
376    /// Execute and return the number of matching rows.
377    pub fn count(&self) -> Result<u32, QueryError>
378    where
379        E: EntityValue,
380    {
381        self.execute_terminal(
382            ExistingRowsTerminalStrategy::count_rows(),
383            FluentScalarTerminalOutput::into_count,
384        )
385    }
386
387    /// Execute and return the total persisted payload bytes for the effective
388    /// result window.
389    pub fn bytes(&self) -> Result<u64, QueryError>
390    where
391        E: EntityValue,
392    {
393        self.with_non_paged(DbSession::execute_fluent_bytes)
394    }
395
396    /// Execute and return the total serialized bytes for `field` over the
397    /// effective result window.
398    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
399    where
400        E: EntityValue,
401    {
402        let target_slot = self.resolve_non_paged_slot(field)?;
403
404        self.with_non_paged(|session, query| {
405            session.execute_fluent_bytes_by_slot(query, target_slot)
406        })
407    }
408
409    /// Explain `bytes_by(field)` routing without executing the terminal.
410    pub fn explain_bytes_by(
411        &self,
412        field: impl AsRef<str>,
413    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
414    where
415        E: EntityValue,
416    {
417        let target_slot = self.resolve_non_paged_slot(field)?;
418
419        self.with_non_paged(|session, query| {
420            session.explain_query_bytes_by_with_visible_indexes(query, target_slot.field())
421        })
422    }
423
424    /// Execute and return the smallest matching identifier, if any.
425    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
426    where
427        E: EntityValue,
428    {
429        self.execute_terminal(
430            ScalarTerminalStrategy::id_terminal(AggregateKind::Min),
431            FluentScalarTerminalOutput::into_id,
432        )
433    }
434
435    /// Explain scalar `min()` routing without executing the terminal.
436    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
437    where
438        E: EntityValue,
439    {
440        self.explain_terminal(&ScalarTerminalStrategy::id_terminal(AggregateKind::Min))
441    }
442
443    /// Execute and return the id of the row with the smallest value for `field`.
444    ///
445    /// Ties are deterministic: equal field values resolve by primary key ascending.
446    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
447    where
448        E: EntityValue,
449    {
450        let target_slot = self.resolve_non_paged_slot(field)?;
451
452        self.execute_terminal(
453            ScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
454            FluentScalarTerminalOutput::into_id,
455        )
456    }
457
458    /// Execute and return the largest matching identifier, if any.
459    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
460    where
461        E: EntityValue,
462    {
463        self.execute_terminal(
464            ScalarTerminalStrategy::id_terminal(AggregateKind::Max),
465            FluentScalarTerminalOutput::into_id,
466        )
467    }
468
469    /// Explain scalar `max()` routing without executing the terminal.
470    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
471    where
472        E: EntityValue,
473    {
474        self.explain_terminal(&ScalarTerminalStrategy::id_terminal(AggregateKind::Max))
475    }
476
477    /// Execute and return the id of the row with the largest value for `field`.
478    ///
479    /// Ties are deterministic: equal field values resolve by primary key ascending.
480    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
481    where
482        E: EntityValue,
483    {
484        let target_slot = self.resolve_non_paged_slot(field)?;
485
486        self.execute_terminal(
487            ScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
488            FluentScalarTerminalOutput::into_id,
489        )
490    }
491
492    /// Execute and return the id at zero-based ordinal `nth` when rows are
493    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
494    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
495    where
496        E: EntityValue,
497    {
498        let target_slot = self.resolve_non_paged_slot(field)?;
499
500        self.execute_terminal(
501            OrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
502            FluentScalarTerminalOutput::into_id,
503        )
504    }
505
506    /// Execute and return the sum of `field` over matching rows.
507    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
508    where
509        E: EntityValue,
510    {
511        let target_slot = self.resolve_non_paged_slot(field)?;
512
513        self.execute_terminal(NumericFieldStrategy::sum_by_slot(target_slot), Ok)
514    }
515
516    /// Explain scalar `sum_by(field)` routing without executing the terminal.
517    pub fn explain_sum_by(
518        &self,
519        field: impl AsRef<str>,
520    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
521    where
522        E: EntityValue,
523    {
524        let target_slot = self.resolve_non_paged_slot(field)?;
525
526        self.explain_terminal(&NumericFieldStrategy::sum_by_slot(target_slot))
527    }
528
529    /// Execute and return the sum of distinct `field` values.
530    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
531    where
532        E: EntityValue,
533    {
534        let target_slot = self.resolve_non_paged_slot(field)?;
535
536        self.execute_terminal(NumericFieldStrategy::sum_distinct_by_slot(target_slot), Ok)
537    }
538
539    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
540    pub fn explain_sum_distinct_by(
541        &self,
542        field: impl AsRef<str>,
543    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
544    where
545        E: EntityValue,
546    {
547        let target_slot = self.resolve_non_paged_slot(field)?;
548
549        self.explain_terminal(&NumericFieldStrategy::sum_distinct_by_slot(target_slot))
550    }
551
552    /// Execute and return the average of `field` over matching rows.
553    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
554    where
555        E: EntityValue,
556    {
557        let target_slot = self.resolve_non_paged_slot(field)?;
558
559        self.execute_terminal(NumericFieldStrategy::avg_by_slot(target_slot), Ok)
560    }
561
562    /// Explain scalar `avg_by(field)` routing without executing the terminal.
563    pub fn explain_avg_by(
564        &self,
565        field: impl AsRef<str>,
566    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
567    where
568        E: EntityValue,
569    {
570        let target_slot = self.resolve_non_paged_slot(field)?;
571
572        self.explain_terminal(&NumericFieldStrategy::avg_by_slot(target_slot))
573    }
574
575    /// Execute and return the average of distinct `field` values.
576    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
577    where
578        E: EntityValue,
579    {
580        let target_slot = self.resolve_non_paged_slot(field)?;
581
582        self.execute_terminal(NumericFieldStrategy::avg_distinct_by_slot(target_slot), Ok)
583    }
584
585    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
586    pub fn explain_avg_distinct_by(
587        &self,
588        field: impl AsRef<str>,
589    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
590    where
591        E: EntityValue,
592    {
593        let target_slot = self.resolve_non_paged_slot(field)?;
594
595        self.explain_terminal(&NumericFieldStrategy::avg_distinct_by_slot(target_slot))
596    }
597
598    /// Execute and return the median id by `field` using deterministic ordering
599    /// `(field asc, primary key asc)`.
600    ///
601    /// Even-length windows select the lower median.
602    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
603    where
604        E: EntityValue,
605    {
606        let target_slot = self.resolve_non_paged_slot(field)?;
607
608        self.execute_terminal(
609            OrderSensitiveTerminalStrategy::median_by_slot(target_slot),
610            FluentScalarTerminalOutput::into_id,
611        )
612    }
613
614    /// Execute and return the number of distinct values for `field` over the
615    /// effective result window.
616    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
617    where
618        E: EntityValue,
619    {
620        let target_slot = self.resolve_non_paged_slot(field)?;
621
622        self.execute_terminal(
623            ProjectionStrategy::count_distinct_by_slot(target_slot),
624            FluentProjectionTerminalOutput::into_count,
625        )
626    }
627
628    /// Explain `count_distinct_by(field)` routing without executing the terminal.
629    pub fn explain_count_distinct_by(
630        &self,
631        field: impl AsRef<str>,
632    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
633    where
634        E: EntityValue,
635    {
636        let target_slot = self.resolve_non_paged_slot(field)?;
637
638        self.explain_terminal(&ProjectionStrategy::count_distinct_by_slot(target_slot))
639    }
640
641    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
642    ///
643    /// Tie handling is deterministic for both extrema: primary key ascending.
644    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
645    where
646        E: EntityValue,
647    {
648        let target_slot = self.resolve_non_paged_slot(field)?;
649
650        self.execute_terminal(
651            OrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
652            FluentScalarTerminalOutput::into_id_pair,
653        )
654    }
655
656    /// Execute and return projected field values for the effective result window.
657    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
658    where
659        E: EntityValue,
660    {
661        let target_slot = self.resolve_non_paged_slot(field)?;
662
663        self.execute_terminal(
664            ProjectionStrategy::values_by_slot(target_slot),
665            FluentProjectionTerminalOutput::into_values,
666        )
667        .map(output_values)
668    }
669
670    /// Execute and return projected values for one shared bounded projection
671    /// over the effective response window.
672    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
673    where
674        E: EntityValue,
675        P: ValueProjectionExpr,
676    {
677        let target_slot = self.resolve_non_paged_slot(projection.field())?;
678        let values = self
679            .execute_terminal(ProjectionStrategy::values_by_slot(target_slot), Ok)?
680            .into_values()
681            .map_err(QueryError::execute)?;
682
683        Self::project_terminal_items(projection, values, |projection, value| {
684            projection.apply_value(value)
685        })
686        .map(output_values)
687    }
688
689    /// Explain `project_values(projection)` routing without executing it.
690    pub fn explain_project_values<P>(
691        &self,
692        projection: &P,
693    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
694    where
695        E: EntityValue,
696        P: ValueProjectionExpr,
697    {
698        let target_slot = self.resolve_non_paged_slot(projection.field())?;
699
700        self.explain_terminal(&ProjectionStrategy::values_by_slot(target_slot))
701    }
702
703    /// Explain `values_by(field)` routing without executing the terminal.
704    pub fn explain_values_by(
705        &self,
706        field: impl AsRef<str>,
707    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
708    where
709        E: EntityValue,
710    {
711        let target_slot = self.resolve_non_paged_slot(field)?;
712
713        self.explain_terminal(&ProjectionStrategy::values_by_slot(target_slot))
714    }
715
716    /// Execute and return the first `k` rows from the effective response window.
717    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
718    where
719        E: EntityValue,
720    {
721        self.with_non_paged(|session, query| session.execute_fluent_take(query, take_count))
722    }
723
724    /// Execute and return the top `k` rows by `field` under deterministic
725    /// ordering `(field desc, primary_key asc)` over the effective response
726    /// window.
727    ///
728    /// This terminal applies its own ordering and does not preserve query
729    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
730    /// matches `max_by(field)` selection semantics.
731    pub fn top_k_by(
732        &self,
733        field: impl AsRef<str>,
734        take_count: u32,
735    ) -> Result<EntityResponse<E>, QueryError>
736    where
737        E: EntityValue,
738    {
739        let target_slot = self.resolve_non_paged_slot(field)?;
740
741        self.with_non_paged(|session, query| {
742            session.execute_fluent_ranked_rows_by_slot(query, target_slot, take_count, true)
743        })
744    }
745
746    /// Execute and return the bottom `k` rows by `field` under deterministic
747    /// ordering `(field asc, primary_key asc)` over the effective response
748    /// window.
749    ///
750    /// This terminal applies its own ordering and does not preserve query
751    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
752    /// matches `min_by(field)` selection semantics.
753    pub fn bottom_k_by(
754        &self,
755        field: impl AsRef<str>,
756        take_count: u32,
757    ) -> Result<EntityResponse<E>, QueryError>
758    where
759        E: EntityValue,
760    {
761        let target_slot = self.resolve_non_paged_slot(field)?;
762
763        self.with_non_paged(|session, query| {
764            session.execute_fluent_ranked_rows_by_slot(query, target_slot, take_count, false)
765        })
766    }
767
768    /// Execute and return projected values for the top `k` rows by `field`
769    /// under deterministic ordering `(field desc, primary_key asc)` over the
770    /// effective response window.
771    ///
772    /// Ranking is applied before projection and does not preserve query
773    /// `order_term(...)` row order in the returned values. For `k = 1`, this
774    /// matches `max_by(field)` projected to one value.
775    pub fn top_k_by_values(
776        &self,
777        field: impl AsRef<str>,
778        take_count: u32,
779    ) -> Result<Vec<OutputValue>, QueryError>
780    where
781        E: EntityValue,
782    {
783        let target_slot = self.resolve_non_paged_slot(field)?;
784
785        self.with_non_paged(|session, query| {
786            session
787                .execute_fluent_ranked_values_by_slot(query, target_slot, take_count, true)
788                .map(output_values)
789        })
790    }
791
792    /// Execute and return projected values for the bottom `k` rows by `field`
793    /// under deterministic ordering `(field asc, primary_key asc)` over the
794    /// effective response window.
795    ///
796    /// Ranking is applied before projection and does not preserve query
797    /// `order_term(...)` row order in the returned values. For `k = 1`, this
798    /// matches `min_by(field)` projected to one value.
799    pub fn bottom_k_by_values(
800        &self,
801        field: impl AsRef<str>,
802        take_count: u32,
803    ) -> Result<Vec<OutputValue>, QueryError>
804    where
805        E: EntityValue,
806    {
807        let target_slot = self.resolve_non_paged_slot(field)?;
808
809        self.with_non_paged(|session, query| {
810            session
811                .execute_fluent_ranked_values_by_slot(query, target_slot, take_count, false)
812                .map(output_values)
813        })
814    }
815
816    /// Execute and return projected id/value pairs for the top `k` rows by
817    /// `field` under deterministic ordering `(field desc, primary_key asc)`
818    /// over the effective response window.
819    ///
820    /// Ranking is applied before projection and does not preserve query
821    /// `order_term(...)` row order in the returned values. For `k = 1`, this
822    /// matches `max_by(field)` projected to one `(id, value)` pair.
823    pub fn top_k_by_with_ids(
824        &self,
825        field: impl AsRef<str>,
826        take_count: u32,
827    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
828    where
829        E: EntityValue,
830    {
831        let target_slot = self.resolve_non_paged_slot(field)?;
832
833        self.with_non_paged(|session, query| {
834            session
835                .execute_fluent_ranked_values_with_ids_by_slot(query, target_slot, take_count, true)
836                .map(output_values_with_ids)
837        })
838    }
839
840    /// Execute and return projected id/value pairs for the bottom `k` rows by
841    /// `field` under deterministic ordering `(field asc, primary_key asc)`
842    /// over the effective response window.
843    ///
844    /// Ranking is applied before projection and does not preserve query
845    /// `order_term(...)` row order in the returned values. For `k = 1`, this
846    /// matches `min_by(field)` projected to one `(id, value)` pair.
847    pub fn bottom_k_by_with_ids(
848        &self,
849        field: impl AsRef<str>,
850        take_count: u32,
851    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
852    where
853        E: EntityValue,
854    {
855        let target_slot = self.resolve_non_paged_slot(field)?;
856
857        self.with_non_paged(|session, query| {
858            session
859                .execute_fluent_ranked_values_with_ids_by_slot(
860                    query,
861                    target_slot,
862                    take_count,
863                    false,
864                )
865                .map(output_values_with_ids)
866        })
867    }
868
869    /// Execute and return distinct projected field values for the effective
870    /// result window, preserving first-observed value order.
871    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
872    where
873        E: EntityValue,
874    {
875        let target_slot = self.resolve_non_paged_slot(field)?;
876
877        self.execute_terminal(
878            ProjectionStrategy::distinct_values_by_slot(target_slot),
879            FluentProjectionTerminalOutput::into_values,
880        )
881        .map(output_values)
882    }
883
884    /// Explain `distinct_values_by(field)` routing without executing the terminal.
885    pub fn explain_distinct_values_by(
886        &self,
887        field: impl AsRef<str>,
888    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
889    where
890        E: EntityValue,
891    {
892        let target_slot = self.resolve_non_paged_slot(field)?;
893
894        self.explain_terminal(&ProjectionStrategy::distinct_values_by_slot(target_slot))
895    }
896
897    /// Execute and return projected field values paired with row ids for the
898    /// effective result window.
899    pub fn values_by_with_ids(
900        &self,
901        field: impl AsRef<str>,
902    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
903    where
904        E: EntityValue,
905    {
906        let target_slot = self.resolve_non_paged_slot(field)?;
907
908        self.execute_terminal(
909            ProjectionStrategy::values_by_with_ids_slot(target_slot),
910            FluentProjectionTerminalOutput::into_values_with_ids,
911        )
912        .map(output_values_with_ids)
913    }
914
915    /// Execute and return projected id/value pairs for one shared bounded
916    /// projection over the effective response window.
917    pub fn project_values_with_ids<P>(
918        &self,
919        projection: &P,
920    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
921    where
922        E: EntityValue,
923        P: ValueProjectionExpr,
924    {
925        let target_slot = self.resolve_non_paged_slot(projection.field())?;
926        let values = self
927            .execute_terminal(ProjectionStrategy::values_by_with_ids_slot(target_slot), Ok)?
928            .into_values_with_ids()
929            .map_err(QueryError::execute)?;
930
931        Self::project_terminal_items(projection, values, |projection, (id, value)| {
932            Ok((id, projection.apply_value(value)?))
933        })
934        .map(output_values_with_ids)
935    }
936
937    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
938    pub fn explain_values_by_with_ids(
939        &self,
940        field: impl AsRef<str>,
941    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
942    where
943        E: EntityValue,
944    {
945        let target_slot = self.resolve_non_paged_slot(field)?;
946
947        self.explain_terminal(&ProjectionStrategy::values_by_with_ids_slot(target_slot))
948    }
949
950    /// Execute and return the first projected field value in effective response
951    /// order, if any.
952    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
953    where
954        E: EntityValue,
955    {
956        let target_slot = self.resolve_non_paged_slot(field)?;
957
958        self.execute_terminal(
959            ProjectionStrategy::first_value_by_slot(target_slot),
960            FluentProjectionTerminalOutput::into_terminal_value,
961        )
962        .map(|value| value.map(output))
963    }
964
965    /// Execute and return the first projected value for one shared bounded
966    /// projection in effective response order, if any.
967    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
968    where
969        E: EntityValue,
970        P: ValueProjectionExpr,
971    {
972        let target_slot = self.resolve_non_paged_slot(projection.field())?;
973        let value = self
974            .execute_terminal(ProjectionStrategy::first_value_by_slot(target_slot), Ok)?
975            .into_terminal_value()
976            .map_err(QueryError::execute)?;
977
978        let mut projected =
979            Self::project_terminal_items(projection, value, |projection, value| {
980                projection.apply_value(value)
981            })?;
982
983        Ok(projected.pop().map(output))
984    }
985
986    /// Explain `first_value_by(field)` routing without executing the terminal.
987    pub fn explain_first_value_by(
988        &self,
989        field: impl AsRef<str>,
990    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
991    where
992        E: EntityValue,
993    {
994        let target_slot = self.resolve_non_paged_slot(field)?;
995
996        self.explain_terminal(&ProjectionStrategy::first_value_by_slot(target_slot))
997    }
998
999    /// Execute and return the last projected field value in effective response
1000    /// order, if any.
1001    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1002    where
1003        E: EntityValue,
1004    {
1005        let target_slot = self.resolve_non_paged_slot(field)?;
1006
1007        self.execute_terminal(
1008            ProjectionStrategy::last_value_by_slot(target_slot),
1009            FluentProjectionTerminalOutput::into_terminal_value,
1010        )
1011        .map(|value| value.map(output))
1012    }
1013
1014    /// Execute and return the last projected value for one shared bounded
1015    /// projection in effective response order, if any.
1016    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1017    where
1018        E: EntityValue,
1019        P: ValueProjectionExpr,
1020    {
1021        let target_slot = self.resolve_non_paged_slot(projection.field())?;
1022        let value = self
1023            .execute_terminal(ProjectionStrategy::last_value_by_slot(target_slot), Ok)?
1024            .into_terminal_value()
1025            .map_err(QueryError::execute)?;
1026
1027        let mut projected =
1028            Self::project_terminal_items(projection, value, |projection, value| {
1029                projection.apply_value(value)
1030            })?;
1031
1032        Ok(projected.pop().map(output))
1033    }
1034
1035    /// Explain `last_value_by(field)` routing without executing the terminal.
1036    pub fn explain_last_value_by(
1037        &self,
1038        field: impl AsRef<str>,
1039    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1040    where
1041        E: EntityValue,
1042    {
1043        let target_slot = self.resolve_non_paged_slot(field)?;
1044
1045        self.explain_terminal(&ProjectionStrategy::last_value_by_slot(target_slot))
1046    }
1047
1048    /// Execute and return the first matching identifier in response order, if any.
1049    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1050    where
1051        E: EntityValue,
1052    {
1053        self.execute_terminal(
1054            OrderSensitiveTerminalStrategy::first(),
1055            FluentScalarTerminalOutput::into_id,
1056        )
1057    }
1058
1059    /// Explain scalar `first()` routing without executing the terminal.
1060    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1061    where
1062        E: EntityValue,
1063    {
1064        self.explain_terminal(&OrderSensitiveTerminalStrategy::first())
1065    }
1066
1067    /// Execute and return the last matching identifier in response order, if any.
1068    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1069    where
1070        E: EntityValue,
1071    {
1072        self.execute_terminal(
1073            OrderSensitiveTerminalStrategy::last(),
1074            FluentScalarTerminalOutput::into_id,
1075        )
1076    }
1077
1078    /// Explain scalar `last()` routing without executing the terminal.
1079    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1080    where
1081        E: EntityValue,
1082    {
1083        self.explain_terminal(&OrderSensitiveTerminalStrategy::last())
1084    }
1085
1086    /// Execute and require exactly one matching row.
1087    pub fn require_one(&self) -> Result<(), QueryError>
1088    where
1089        E: EntityValue,
1090    {
1091        self.execute()?.into_rows()?.require_one()?;
1092        Ok(())
1093    }
1094
1095    /// Execute and require at least one matching row.
1096    pub fn require_some(&self) -> Result<(), QueryError>
1097    where
1098        E: EntityValue,
1099    {
1100        self.execute()?.into_rows()?.require_some()?;
1101        Ok(())
1102    }
1103}