Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                TextProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        if self.query().has_grouping() {
58            return self
59                .session
60                .execute_grouped(self.query(), self.cursor_token.as_deref())
61                .map(LoadQueryResult::Grouped);
62        }
63
64        self.session
65            .execute_query(self.query())
66            .map(LoadQueryResult::Rows)
67    }
68
69    // Run one scalar terminal through the canonical non-paged fluent policy
70    // gate before handing execution to the session load-query adapter.
71    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72    where
73        E: EntityValue,
74        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75    {
76        self.ensure_non_paged_mode_ready()?;
77
78        self.session.execute_load_query_with(self.query(), execute)
79    }
80
81    // Run one explain-visible aggregate terminal through the canonical
82    // non-paged fluent policy gate using the prepared aggregate strategy as
83    // the single explain projection source.
84    fn explain_prepared_aggregate_non_paged_terminal<S>(
85        &self,
86        strategy: &S,
87    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88    where
89        E: EntityValue,
90        S: PreparedFluentAggregateExplainStrategy,
91    {
92        self.ensure_non_paged_mode_ready()?;
93
94        self.session
95            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96    }
97
98    // Run one prepared projection/distinct explain terminal through the
99    // canonical non-paged fluent policy gate using the prepared projection
100    // strategy as the single explain projection source.
101    fn explain_prepared_projection_non_paged_terminal(
102        &self,
103        strategy: &PreparedFluentProjectionStrategy,
104    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105    where
106        E: EntityValue,
107    {
108        self.ensure_non_paged_mode_ready()?;
109
110        self.session
111            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112    }
113
114    // Execute one prepared fluent scalar terminal through the canonical
115    // non-paged fluent policy gate using the prepared runtime request as the
116    // single execution source.
117    fn execute_prepared_scalar_terminal_output(
118        &self,
119        strategy: PreparedFluentScalarTerminalStrategy,
120    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
121    where
122        E: EntityValue,
123    {
124        let runtime_request = strategy.into_runtime_request();
125
126        self.execute_scalar_non_paged_terminal(move |load, plan| {
127            load.execute_scalar_terminal_request(
128                plan,
129                scalar_terminal_boundary_request_from_prepared(runtime_request),
130            )
131        })
132    }
133
134    // Execute one prepared fluent existing-rows terminal through the
135    // canonical non-paged fluent policy gate using the prepared existing-rows
136    // strategy as the single runtime source.
137    fn execute_prepared_existing_rows_terminal_output(
138        &self,
139        strategy: PreparedFluentExistingRowsTerminalStrategy,
140    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
141    where
142        E: EntityValue,
143    {
144        let runtime_request = strategy.into_runtime_request();
145
146        self.execute_scalar_non_paged_terminal(move |load, plan| {
147            load.execute_scalar_terminal_request(
148                plan,
149                existing_rows_terminal_boundary_request_from_prepared(runtime_request),
150            )
151        })
152    }
153
154    // Execute one prepared fluent numeric-field terminal through the canonical
155    // non-paged fluent policy gate using the prepared numeric strategy as the
156    // single runtime source.
157    fn execute_prepared_numeric_field_terminal(
158        &self,
159        strategy: PreparedFluentNumericFieldStrategy,
160    ) -> Result<Option<Decimal>, QueryError>
161    where
162        E: EntityValue,
163    {
164        let (target_field, runtime_request) = strategy.into_runtime_parts();
165
166        self.execute_scalar_non_paged_terminal(move |load, plan| {
167            load.execute_numeric_field_boundary(
168                plan,
169                target_field,
170                numeric_field_boundary_request_from_prepared(runtime_request),
171            )
172        })
173    }
174
175    // Execute one prepared fluent order-sensitive terminal through the
176    // canonical non-paged fluent policy gate using the prepared order-sensitive
177    // strategy as the single runtime source.
178    fn execute_prepared_order_sensitive_terminal_output(
179        &self,
180        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
181    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
182    where
183        E: EntityValue,
184    {
185        let runtime_request = strategy.into_runtime_request();
186
187        self.execute_scalar_non_paged_terminal(move |load, plan| {
188            load.execute_scalar_terminal_request(
189                plan,
190                order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
191            )
192        })
193    }
194
195    // Execute one prepared fluent projection/distinct terminal through the
196    // canonical non-paged fluent policy gate using the prepared projection
197    // strategy as the single runtime source.
198    fn execute_prepared_projection_terminal_output(
199        &self,
200        strategy: PreparedFluentProjectionStrategy,
201    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
202    where
203        E: EntityValue,
204    {
205        let (target_field, runtime_request) = strategy.into_runtime_parts();
206
207        self.execute_scalar_non_paged_terminal(move |load, plan| {
208            load.execute_scalar_projection_boundary(
209                plan,
210                target_field,
211                projection_boundary_request_from_prepared(runtime_request),
212            )
213        })
214    }
215
216    // Apply one shared text projection to iterator-like terminal output while
217    // preserving source order and cardinality.
218    fn project_terminal_items<T, U>(
219        projection: &TextProjectionExpr,
220        values: impl IntoIterator<Item = T>,
221        mut map: impl FnMut(&TextProjectionExpr, T) -> Result<U, QueryError>,
222    ) -> Result<Vec<U>, QueryError> {
223        values
224            .into_iter()
225            .map(|value| map(projection, value))
226            .collect()
227    }
228
229    // ------------------------------------------------------------------
230    // Execution terminals — semantic only
231    // ------------------------------------------------------------------
232
233    /// Execute and return whether the result set is empty.
234    pub fn is_empty(&self) -> Result<bool, QueryError>
235    where
236        E: EntityValue,
237    {
238        self.not_exists()
239    }
240
241    /// Execute and return whether no matching row exists.
242    pub fn not_exists(&self) -> Result<bool, QueryError>
243    where
244        E: EntityValue,
245    {
246        Ok(!self.exists()?)
247    }
248
249    /// Execute and return whether at least one matching row exists.
250    pub fn exists(&self) -> Result<bool, QueryError>
251    where
252        E: EntityValue,
253    {
254        self.execute_prepared_existing_rows_terminal_output(
255            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
256        )?
257        .into_exists()
258        .map_err(QueryError::execute)
259    }
260
261    /// Explain scalar `exists()` routing without executing the terminal.
262    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
263    where
264        E: EntityValue,
265    {
266        self.explain_prepared_aggregate_non_paged_terminal(
267            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
268        )
269    }
270
271    /// Explain scalar `not_exists()` routing without executing the terminal.
272    ///
273    /// This remains an `exists()` execution plan with negated boolean semantics.
274    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
275    where
276        E: EntityValue,
277    {
278        self.explain_exists()
279    }
280
281    /// Explain scalar load execution shape without executing the query.
282    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
283    where
284        E: EntityValue,
285    {
286        self.session
287            .explain_query_execution_with_visible_indexes(self.query())
288    }
289
290    /// Explain scalar load execution shape as deterministic text.
291    pub fn explain_execution_text(&self) -> Result<String, QueryError>
292    where
293        E: EntityValue,
294    {
295        self.session
296            .explain_query_execution_text_with_visible_indexes(self.query())
297    }
298
299    /// Explain scalar load execution shape as canonical JSON.
300    pub fn explain_execution_json(&self) -> Result<String, QueryError>
301    where
302        E: EntityValue,
303    {
304        self.session
305            .explain_query_execution_json_with_visible_indexes(self.query())
306    }
307
308    /// Explain scalar load execution shape as verbose text with diagnostics.
309    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
310    where
311        E: EntityValue,
312    {
313        self.session
314            .explain_query_execution_verbose_with_visible_indexes(self.query())
315    }
316
317    /// Execute and return the number of matching rows.
318    pub fn count(&self) -> Result<u32, QueryError>
319    where
320        E: EntityValue,
321    {
322        self.execute_prepared_existing_rows_terminal_output(
323            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
324        )?
325        .into_count()
326        .map_err(QueryError::execute)
327    }
328
329    /// Execute and return the total persisted payload bytes for the effective
330    /// result window.
331    pub fn bytes(&self) -> Result<u64, QueryError>
332    where
333        E: EntityValue,
334    {
335        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
336    }
337
338    /// Execute and return the total serialized bytes for `field` over the
339    /// effective result window.
340    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
341    where
342        E: EntityValue,
343    {
344        self.ensure_non_paged_mode_ready()?;
345
346        Self::with_slot(field, |target_slot| {
347            self.session
348                .execute_load_query_with(self.query(), move |load, plan| {
349                    load.bytes_by_slot(plan, target_slot)
350                })
351        })
352    }
353
354    /// Explain `bytes_by(field)` routing without executing the terminal.
355    pub fn explain_bytes_by(
356        &self,
357        field: impl AsRef<str>,
358    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
359    where
360        E: EntityValue,
361    {
362        self.ensure_non_paged_mode_ready()?;
363
364        Self::with_slot(field, |target_slot| {
365            self.session
366                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
367        })
368    }
369
370    /// Execute and return the smallest matching identifier, if any.
371    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
372    where
373        E: EntityValue,
374    {
375        self.execute_prepared_scalar_terminal_output(
376            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
377        )?
378        .into_id()
379        .map_err(QueryError::execute)
380    }
381
382    /// Explain scalar `min()` routing without executing the terminal.
383    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
384    where
385        E: EntityValue,
386    {
387        self.explain_prepared_aggregate_non_paged_terminal(
388            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
389        )
390    }
391
392    /// Execute and return the id of the row with the smallest value for `field`.
393    ///
394    /// Ties are deterministic: equal field values resolve by primary key ascending.
395    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
396    where
397        E: EntityValue,
398    {
399        self.ensure_non_paged_mode_ready()?;
400
401        Self::with_slot(field, |target_slot| {
402            self.execute_prepared_scalar_terminal_output(
403                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
404            )?
405            .into_id()
406            .map_err(QueryError::execute)
407        })
408    }
409
410    /// Execute and return the largest matching identifier, if any.
411    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
412    where
413        E: EntityValue,
414    {
415        self.execute_prepared_scalar_terminal_output(
416            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
417        )?
418        .into_id()
419        .map_err(QueryError::execute)
420    }
421
422    /// Explain scalar `max()` routing without executing the terminal.
423    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
424    where
425        E: EntityValue,
426    {
427        self.explain_prepared_aggregate_non_paged_terminal(
428            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
429        )
430    }
431
432    /// Execute and return the id of the row with the largest value for `field`.
433    ///
434    /// Ties are deterministic: equal field values resolve by primary key ascending.
435    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
436    where
437        E: EntityValue,
438    {
439        self.ensure_non_paged_mode_ready()?;
440
441        Self::with_slot(field, |target_slot| {
442            self.execute_prepared_scalar_terminal_output(
443                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
444            )?
445            .into_id()
446            .map_err(QueryError::execute)
447        })
448    }
449
450    /// Execute and return the id at zero-based ordinal `nth` when rows are
451    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
452    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
453    where
454        E: EntityValue,
455    {
456        self.ensure_non_paged_mode_ready()?;
457
458        Self::with_slot(field, |target_slot| {
459            self.execute_prepared_order_sensitive_terminal_output(
460                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
461            )?
462            .into_id()
463            .map_err(QueryError::execute)
464        })
465    }
466
467    /// Execute and return the sum of `field` over matching rows.
468    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
469    where
470        E: EntityValue,
471    {
472        self.ensure_non_paged_mode_ready()?;
473
474        Self::with_slot(field, |target_slot| {
475            self.execute_prepared_numeric_field_terminal(
476                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
477            )
478        })
479    }
480
481    /// Explain scalar `sum_by(field)` routing without executing the terminal.
482    pub fn explain_sum_by(
483        &self,
484        field: impl AsRef<str>,
485    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
486    where
487        E: EntityValue,
488    {
489        self.ensure_non_paged_mode_ready()?;
490
491        Self::with_slot(field, |target_slot| {
492            self.explain_prepared_aggregate_non_paged_terminal(
493                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
494            )
495        })
496    }
497
498    /// Execute and return the sum of distinct `field` values.
499    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
500    where
501        E: EntityValue,
502    {
503        self.ensure_non_paged_mode_ready()?;
504
505        Self::with_slot(field, |target_slot| {
506            self.execute_prepared_numeric_field_terminal(
507                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
508            )
509        })
510    }
511
512    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
513    pub fn explain_sum_distinct_by(
514        &self,
515        field: impl AsRef<str>,
516    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
517    where
518        E: EntityValue,
519    {
520        self.ensure_non_paged_mode_ready()?;
521
522        Self::with_slot(field, |target_slot| {
523            self.explain_prepared_aggregate_non_paged_terminal(
524                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
525            )
526        })
527    }
528
529    /// Execute and return the average of `field` over matching rows.
530    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
531    where
532        E: EntityValue,
533    {
534        self.ensure_non_paged_mode_ready()?;
535
536        Self::with_slot(field, |target_slot| {
537            self.execute_prepared_numeric_field_terminal(
538                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
539            )
540        })
541    }
542
543    /// Explain scalar `avg_by(field)` routing without executing the terminal.
544    pub fn explain_avg_by(
545        &self,
546        field: impl AsRef<str>,
547    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
548    where
549        E: EntityValue,
550    {
551        self.ensure_non_paged_mode_ready()?;
552
553        Self::with_slot(field, |target_slot| {
554            self.explain_prepared_aggregate_non_paged_terminal(
555                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
556            )
557        })
558    }
559
560    /// Execute and return the average of distinct `field` values.
561    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
562    where
563        E: EntityValue,
564    {
565        self.ensure_non_paged_mode_ready()?;
566
567        Self::with_slot(field, |target_slot| {
568            self.execute_prepared_numeric_field_terminal(
569                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
570            )
571        })
572    }
573
574    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
575    pub fn explain_avg_distinct_by(
576        &self,
577        field: impl AsRef<str>,
578    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
579    where
580        E: EntityValue,
581    {
582        self.ensure_non_paged_mode_ready()?;
583
584        Self::with_slot(field, |target_slot| {
585            self.explain_prepared_aggregate_non_paged_terminal(
586                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
587            )
588        })
589    }
590
591    /// Execute and return the median id by `field` using deterministic ordering
592    /// `(field asc, primary key asc)`.
593    ///
594    /// Even-length windows select the lower median.
595    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
596    where
597        E: EntityValue,
598    {
599        self.ensure_non_paged_mode_ready()?;
600
601        Self::with_slot(field, |target_slot| {
602            self.execute_prepared_order_sensitive_terminal_output(
603                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
604            )?
605            .into_id()
606            .map_err(QueryError::execute)
607        })
608    }
609
610    /// Execute and return the number of distinct values for `field` over the
611    /// effective result window.
612    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
613    where
614        E: EntityValue,
615    {
616        self.ensure_non_paged_mode_ready()?;
617
618        Self::with_slot(field, |target_slot| {
619            self.execute_prepared_projection_terminal_output(
620                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
621            )?
622            .into_count()
623            .map_err(QueryError::execute)
624        })
625    }
626
627    /// Explain `count_distinct_by(field)` routing without executing the terminal.
628    pub fn explain_count_distinct_by(
629        &self,
630        field: impl AsRef<str>,
631    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
632    where
633        E: EntityValue,
634    {
635        self.ensure_non_paged_mode_ready()?;
636
637        Self::with_slot(field, |target_slot| {
638            self.explain_prepared_projection_non_paged_terminal(
639                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
640            )
641        })
642    }
643
644    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
645    ///
646    /// Tie handling is deterministic for both extrema: primary key ascending.
647    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
648    where
649        E: EntityValue,
650    {
651        self.ensure_non_paged_mode_ready()?;
652
653        Self::with_slot(field, |target_slot| {
654            self.execute_prepared_order_sensitive_terminal_output(
655                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
656            )?
657            .into_id_pair()
658            .map_err(QueryError::execute)
659        })
660    }
661
662    /// Execute and return projected field values for the effective result window.
663    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
664    where
665        E: EntityValue,
666    {
667        self.ensure_non_paged_mode_ready()?;
668
669        Self::with_slot(field, |target_slot| {
670            self.execute_prepared_projection_terminal_output(
671                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
672            )?
673            .into_values()
674            .map_err(QueryError::execute)
675        })
676    }
677
678    /// Execute and return projected values for one shared text projection over
679    /// the effective response window.
680    pub fn project_values(&self, projection: &TextProjectionExpr) -> Result<Vec<Value>, QueryError>
681    where
682        E: EntityValue,
683    {
684        self.ensure_non_paged_mode_ready()?;
685
686        Self::with_slot(projection.field(), |target_slot| {
687            let values = self
688                .execute_prepared_projection_terminal_output(
689                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
690                )?
691                .into_values()
692                .map_err(QueryError::execute)?;
693
694            Self::project_terminal_items(projection, values, |projection, value| {
695                projection.apply_value(value)
696            })
697        })
698    }
699
700    /// Explain `project_values(projection)` routing without executing it.
701    pub fn explain_project_values(
702        &self,
703        projection: &TextProjectionExpr,
704    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
705    where
706        E: EntityValue,
707    {
708        self.ensure_non_paged_mode_ready()?;
709
710        Self::with_slot(projection.field(), |target_slot| {
711            self.explain_prepared_projection_non_paged_terminal(
712                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
713            )
714        })
715    }
716
717    /// Explain `values_by(field)` routing without executing the terminal.
718    pub fn explain_values_by(
719        &self,
720        field: impl AsRef<str>,
721    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
722    where
723        E: EntityValue,
724    {
725        self.ensure_non_paged_mode_ready()?;
726
727        Self::with_slot(field, |target_slot| {
728            self.explain_prepared_projection_non_paged_terminal(
729                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
730            )
731        })
732    }
733
734    /// Execute and return the first `k` rows from the effective response window.
735    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
736    where
737        E: EntityValue,
738    {
739        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
740    }
741
742    /// Execute and return the top `k` rows by `field` under deterministic
743    /// ordering `(field desc, primary_key asc)` over the effective response
744    /// window.
745    ///
746    /// This terminal applies its own ordering and does not preserve query
747    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
748    /// matches `max_by(field)` selection semantics.
749    pub fn top_k_by(
750        &self,
751        field: impl AsRef<str>,
752        take_count: u32,
753    ) -> Result<EntityResponse<E>, QueryError>
754    where
755        E: EntityValue,
756    {
757        self.ensure_non_paged_mode_ready()?;
758
759        Self::with_slot(field, |target_slot| {
760            self.session
761                .execute_load_query_with(self.query(), move |load, plan| {
762                    load.top_k_by_slot(plan, target_slot, take_count)
763                })
764        })
765    }
766
767    /// Execute and return the bottom `k` rows by `field` under deterministic
768    /// ordering `(field asc, primary_key asc)` over the effective response
769    /// window.
770    ///
771    /// This terminal applies its own ordering and does not preserve query
772    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
773    /// matches `min_by(field)` selection semantics.
774    pub fn bottom_k_by(
775        &self,
776        field: impl AsRef<str>,
777        take_count: u32,
778    ) -> Result<EntityResponse<E>, QueryError>
779    where
780        E: EntityValue,
781    {
782        self.ensure_non_paged_mode_ready()?;
783
784        Self::with_slot(field, |target_slot| {
785            self.session
786                .execute_load_query_with(self.query(), move |load, plan| {
787                    load.bottom_k_by_slot(plan, target_slot, take_count)
788                })
789        })
790    }
791
792    /// Execute and return projected values for the top `k` rows by `field`
793    /// under deterministic ordering `(field desc, primary_key asc)` over the
794    /// effective response window.
795    ///
796    /// Ranking is applied before projection and does not preserve query
797    /// `order_by(...)` row order in the returned values. For `k = 1`, this
798    /// matches `max_by(field)` projected to one value.
799    pub fn top_k_by_values(
800        &self,
801        field: impl AsRef<str>,
802        take_count: u32,
803    ) -> Result<Vec<Value>, QueryError>
804    where
805        E: EntityValue,
806    {
807        self.ensure_non_paged_mode_ready()?;
808
809        Self::with_slot(field, |target_slot| {
810            self.session
811                .execute_load_query_with(self.query(), move |load, plan| {
812                    load.top_k_by_values_slot(plan, target_slot, take_count)
813                })
814        })
815    }
816
817    /// Execute and return projected values for the bottom `k` rows by `field`
818    /// under deterministic ordering `(field asc, primary_key asc)` over the
819    /// effective response window.
820    ///
821    /// Ranking is applied before projection and does not preserve query
822    /// `order_by(...)` row order in the returned values. For `k = 1`, this
823    /// matches `min_by(field)` projected to one value.
824    pub fn bottom_k_by_values(
825        &self,
826        field: impl AsRef<str>,
827        take_count: u32,
828    ) -> Result<Vec<Value>, QueryError>
829    where
830        E: EntityValue,
831    {
832        self.ensure_non_paged_mode_ready()?;
833
834        Self::with_slot(field, |target_slot| {
835            self.session
836                .execute_load_query_with(self.query(), move |load, plan| {
837                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
838                })
839        })
840    }
841
842    /// Execute and return projected id/value pairs for the top `k` rows by
843    /// `field` under deterministic ordering `(field desc, primary_key asc)`
844    /// over the effective response window.
845    ///
846    /// Ranking is applied before projection and does not preserve query
847    /// `order_by(...)` row order in the returned values. For `k = 1`, this
848    /// matches `max_by(field)` projected to one `(id, value)` pair.
849    pub fn top_k_by_with_ids(
850        &self,
851        field: impl AsRef<str>,
852        take_count: u32,
853    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
854    where
855        E: EntityValue,
856    {
857        self.ensure_non_paged_mode_ready()?;
858
859        Self::with_slot(field, |target_slot| {
860            self.session
861                .execute_load_query_with(self.query(), move |load, plan| {
862                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
863                })
864        })
865    }
866
867    /// Execute and return projected id/value pairs for the bottom `k` rows by
868    /// `field` under deterministic ordering `(field asc, primary_key asc)`
869    /// over the effective response window.
870    ///
871    /// Ranking is applied before projection and does not preserve query
872    /// `order_by(...)` row order in the returned values. For `k = 1`, this
873    /// matches `min_by(field)` projected to one `(id, value)` pair.
874    pub fn bottom_k_by_with_ids(
875        &self,
876        field: impl AsRef<str>,
877        take_count: u32,
878    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
879    where
880        E: EntityValue,
881    {
882        self.ensure_non_paged_mode_ready()?;
883
884        Self::with_slot(field, |target_slot| {
885            self.session
886                .execute_load_query_with(self.query(), move |load, plan| {
887                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
888                })
889        })
890    }
891
892    /// Execute and return distinct projected field values for the effective
893    /// result window, preserving first-observed value order.
894    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
895    where
896        E: EntityValue,
897    {
898        self.ensure_non_paged_mode_ready()?;
899
900        Self::with_slot(field, |target_slot| {
901            self.execute_prepared_projection_terminal_output(
902                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
903            )?
904            .into_values()
905            .map_err(QueryError::execute)
906        })
907    }
908
909    /// Explain `distinct_values_by(field)` routing without executing the terminal.
910    pub fn explain_distinct_values_by(
911        &self,
912        field: impl AsRef<str>,
913    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
914    where
915        E: EntityValue,
916    {
917        self.ensure_non_paged_mode_ready()?;
918
919        Self::with_slot(field, |target_slot| {
920            self.explain_prepared_projection_non_paged_terminal(
921                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
922            )
923        })
924    }
925
926    /// Execute and return projected field values paired with row ids for the
927    /// effective result window.
928    pub fn values_by_with_ids(
929        &self,
930        field: impl AsRef<str>,
931    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
932    where
933        E: EntityValue,
934    {
935        self.ensure_non_paged_mode_ready()?;
936
937        Self::with_slot(field, |target_slot| {
938            self.execute_prepared_projection_terminal_output(
939                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
940            )?
941            .into_values_with_ids()
942            .map_err(QueryError::execute)
943        })
944    }
945
946    /// Execute and return projected id/value pairs for one shared text
947    /// projection over the effective response window.
948    pub fn project_values_with_ids(
949        &self,
950        projection: &TextProjectionExpr,
951    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
952    where
953        E: EntityValue,
954    {
955        self.ensure_non_paged_mode_ready()?;
956
957        Self::with_slot(projection.field(), |target_slot| {
958            let values = self
959                .execute_prepared_projection_terminal_output(
960                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
961                )?
962                .into_values_with_ids::<E>()
963                .map_err(QueryError::execute)?;
964
965            Self::project_terminal_items(projection, values, |projection, (id, value)| {
966                Ok((id, projection.apply_value(value)?))
967            })
968        })
969    }
970
971    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
972    pub fn explain_values_by_with_ids(
973        &self,
974        field: impl AsRef<str>,
975    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
976    where
977        E: EntityValue,
978    {
979        self.ensure_non_paged_mode_ready()?;
980
981        Self::with_slot(field, |target_slot| {
982            self.explain_prepared_projection_non_paged_terminal(
983                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
984            )
985        })
986    }
987
988    /// Execute and return the first projected field value in effective response
989    /// order, if any.
990    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
991    where
992        E: EntityValue,
993    {
994        self.ensure_non_paged_mode_ready()?;
995
996        Self::with_slot(field, |target_slot| {
997            self.execute_prepared_projection_terminal_output(
998                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
999            )?
1000            .into_terminal_value()
1001            .map_err(QueryError::execute)
1002        })
1003    }
1004
1005    /// Execute and return the first projected value for one shared text
1006    /// projection in effective response order, if any.
1007    pub fn project_first_value(
1008        &self,
1009        projection: &TextProjectionExpr,
1010    ) -> Result<Option<Value>, QueryError>
1011    where
1012        E: EntityValue,
1013    {
1014        self.ensure_non_paged_mode_ready()?;
1015
1016        Self::with_slot(projection.field(), |target_slot| {
1017            let value = self
1018                .execute_prepared_projection_terminal_output(
1019                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1020                )?
1021                .into_terminal_value()
1022                .map_err(QueryError::execute)?;
1023
1024            let mut projected =
1025                Self::project_terminal_items(projection, value, |projection, value| {
1026                    projection.apply_value(value)
1027                })?;
1028
1029            Ok(projected.pop())
1030        })
1031    }
1032
1033    /// Explain `first_value_by(field)` routing without executing the terminal.
1034    pub fn explain_first_value_by(
1035        &self,
1036        field: impl AsRef<str>,
1037    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1038    where
1039        E: EntityValue,
1040    {
1041        self.ensure_non_paged_mode_ready()?;
1042
1043        Self::with_slot(field, |target_slot| {
1044            self.explain_prepared_projection_non_paged_terminal(
1045                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1046            )
1047        })
1048    }
1049
1050    /// Execute and return the last projected field value in effective response
1051    /// order, if any.
1052    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1053    where
1054        E: EntityValue,
1055    {
1056        self.ensure_non_paged_mode_ready()?;
1057
1058        Self::with_slot(field, |target_slot| {
1059            self.execute_prepared_projection_terminal_output(
1060                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1061            )?
1062            .into_terminal_value()
1063            .map_err(QueryError::execute)
1064        })
1065    }
1066
1067    /// Execute and return the last projected value for one shared text
1068    /// projection in effective response order, if any.
1069    pub fn project_last_value(
1070        &self,
1071        projection: &TextProjectionExpr,
1072    ) -> Result<Option<Value>, QueryError>
1073    where
1074        E: EntityValue,
1075    {
1076        self.ensure_non_paged_mode_ready()?;
1077
1078        Self::with_slot(projection.field(), |target_slot| {
1079            let value = self
1080                .execute_prepared_projection_terminal_output(
1081                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1082                )?
1083                .into_terminal_value()
1084                .map_err(QueryError::execute)?;
1085
1086            let mut projected =
1087                Self::project_terminal_items(projection, value, |projection, value| {
1088                    projection.apply_value(value)
1089                })?;
1090
1091            Ok(projected.pop())
1092        })
1093    }
1094
1095    /// Explain `last_value_by(field)` routing without executing the terminal.
1096    pub fn explain_last_value_by(
1097        &self,
1098        field: impl AsRef<str>,
1099    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1100    where
1101        E: EntityValue,
1102    {
1103        self.ensure_non_paged_mode_ready()?;
1104
1105        Self::with_slot(field, |target_slot| {
1106            self.explain_prepared_projection_non_paged_terminal(
1107                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1108            )
1109        })
1110    }
1111
1112    /// Execute and return the first matching identifier in response order, if any.
1113    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1114    where
1115        E: EntityValue,
1116    {
1117        self.execute_prepared_order_sensitive_terminal_output(
1118            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1119        )?
1120        .into_id()
1121        .map_err(QueryError::execute)
1122    }
1123
1124    /// Explain scalar `first()` routing without executing the terminal.
1125    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1126    where
1127        E: EntityValue,
1128    {
1129        self.explain_prepared_aggregate_non_paged_terminal(
1130            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1131        )
1132    }
1133
1134    /// Execute and return the last matching identifier in response order, if any.
1135    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1136    where
1137        E: EntityValue,
1138    {
1139        self.execute_prepared_order_sensitive_terminal_output(
1140            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1141        )?
1142        .into_id()
1143        .map_err(QueryError::execute)
1144    }
1145
1146    /// Explain scalar `last()` routing without executing the terminal.
1147    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1148    where
1149        E: EntityValue,
1150    {
1151        self.explain_prepared_aggregate_non_paged_terminal(
1152            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1153        )
1154    }
1155
1156    /// Execute and require exactly one matching row.
1157    pub fn require_one(&self) -> Result<(), QueryError>
1158    where
1159        E: EntityValue,
1160    {
1161        self.execute()?.into_rows()?.require_one()?;
1162        Ok(())
1163    }
1164
1165    /// Execute and require at least one matching row.
1166    pub fn require_some(&self) -> Result<(), QueryError>
1167    where
1168        E: EntityValue,
1169    {
1170        self.execute()?.into_rows()?.require_some()?;
1171        Ok(())
1172    }
1173}
1174
1175fn scalar_terminal_boundary_request_from_prepared(
1176    request: PreparedFluentScalarTerminalRuntimeRequest,
1177) -> ScalarTerminalBoundaryRequest {
1178    match request {
1179        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1180            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1181        }
1182        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1183            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1184        }
1185    }
1186}
1187
1188const fn existing_rows_terminal_boundary_request_from_prepared(
1189    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1190) -> ScalarTerminalBoundaryRequest {
1191    match request {
1192        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1193            ScalarTerminalBoundaryRequest::Count
1194        }
1195        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1196            ScalarTerminalBoundaryRequest::Exists
1197        }
1198    }
1199}
1200
1201const fn numeric_field_boundary_request_from_prepared(
1202    request: PreparedFluentNumericFieldRuntimeRequest,
1203) -> ScalarNumericFieldBoundaryRequest {
1204    match request {
1205        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1206        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1207            ScalarNumericFieldBoundaryRequest::SumDistinct
1208        }
1209        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1210        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1211            ScalarNumericFieldBoundaryRequest::AvgDistinct
1212        }
1213    }
1214}
1215
1216fn order_sensitive_terminal_boundary_request_from_prepared(
1217    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1218) -> ScalarTerminalBoundaryRequest {
1219    match request {
1220        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1221            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1222        }
1223        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1224            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1225        }
1226        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1227            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1228        }
1229        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1230            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1231        }
1232    }
1233}
1234
1235const fn projection_boundary_request_from_prepared(
1236    request: PreparedFluentProjectionRuntimeRequest,
1237) -> ScalarProjectionBoundaryRequest {
1238    match request {
1239        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1240        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1241            ScalarProjectionBoundaryRequest::DistinctValues
1242        }
1243        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1244            ScalarProjectionBoundaryRequest::CountDistinct
1245        }
1246        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1247            ScalarProjectionBoundaryRequest::ValuesWithIds
1248        }
1249        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1250            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1251        }
1252    }
1253}