Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                TextProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        if self.query().has_grouping() {
58            return self
59                .session
60                .execute_grouped(self.query(), self.cursor_token.as_deref())
61                .map(LoadQueryResult::Grouped);
62        }
63
64        self.session
65            .execute_query(self.query())
66            .map(LoadQueryResult::Rows)
67    }
68
69    // Run one scalar terminal through the canonical non-paged fluent policy
70    // gate before handing execution to the session load-query adapter.
71    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72    where
73        E: EntityValue,
74        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75    {
76        self.ensure_non_paged_mode_ready()?;
77
78        self.session.execute_load_query_with(self.query(), execute)
79    }
80
81    // Run one explain-visible aggregate terminal through the canonical
82    // non-paged fluent policy gate using the prepared aggregate strategy as
83    // the single explain projection source.
84    fn explain_prepared_aggregate_non_paged_terminal<S>(
85        &self,
86        strategy: &S,
87    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88    where
89        E: EntityValue,
90        S: PreparedFluentAggregateExplainStrategy,
91    {
92        self.ensure_non_paged_mode_ready()?;
93
94        self.session
95            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96    }
97
98    // Run one prepared projection/distinct explain terminal through the
99    // canonical non-paged fluent policy gate using the prepared projection
100    // strategy as the single explain projection source.
101    fn explain_prepared_projection_non_paged_terminal(
102        &self,
103        strategy: &PreparedFluentProjectionStrategy,
104    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105    where
106        E: EntityValue,
107    {
108        self.ensure_non_paged_mode_ready()?;
109
110        self.session
111            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112    }
113
114    // Execute one prepared fluent scalar terminal through the canonical
115    // non-paged fluent policy gate using the prepared runtime request as the
116    // single execution source.
117    fn execute_prepared_scalar_terminal_output(
118        &self,
119        strategy: PreparedFluentScalarTerminalStrategy,
120    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
121    where
122        E: EntityValue,
123    {
124        let runtime_request = strategy.into_runtime_request();
125
126        self.execute_scalar_non_paged_terminal(move |load, plan| {
127            load.execute_scalar_terminal_request(
128                plan,
129                scalar_terminal_boundary_request_from_prepared(runtime_request),
130            )
131        })
132    }
133
134    // Execute one prepared fluent existing-rows terminal through the
135    // canonical non-paged fluent policy gate using the prepared existing-rows
136    // strategy as the single runtime source.
137    fn execute_prepared_existing_rows_terminal_output(
138        &self,
139        strategy: PreparedFluentExistingRowsTerminalStrategy,
140    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
141    where
142        E: EntityValue,
143    {
144        let runtime_request = strategy.into_runtime_request();
145
146        self.execute_scalar_non_paged_terminal(move |load, plan| {
147            load.execute_scalar_terminal_request(
148                plan,
149                existing_rows_terminal_boundary_request_from_prepared(runtime_request),
150            )
151        })
152    }
153
154    // Execute one prepared fluent numeric-field terminal through the canonical
155    // non-paged fluent policy gate using the prepared numeric strategy as the
156    // single runtime source.
157    fn execute_prepared_numeric_field_terminal(
158        &self,
159        strategy: PreparedFluentNumericFieldStrategy,
160    ) -> Result<Option<Decimal>, QueryError>
161    where
162        E: EntityValue,
163    {
164        let (target_field, runtime_request) = strategy.into_runtime_parts();
165
166        self.execute_scalar_non_paged_terminal(move |load, plan| {
167            load.execute_numeric_field_boundary(
168                plan,
169                target_field,
170                numeric_field_boundary_request_from_prepared(runtime_request),
171            )
172        })
173    }
174
175    // Execute one prepared fluent order-sensitive terminal through the
176    // canonical non-paged fluent policy gate using the prepared order-sensitive
177    // strategy as the single runtime source.
178    fn execute_prepared_order_sensitive_terminal_output(
179        &self,
180        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
181    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
182    where
183        E: EntityValue,
184    {
185        let runtime_request = strategy.into_runtime_request();
186
187        self.execute_scalar_non_paged_terminal(move |load, plan| {
188            load.execute_scalar_terminal_request(
189                plan,
190                order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
191            )
192        })
193    }
194
195    // Execute one prepared fluent projection/distinct terminal through the
196    // canonical non-paged fluent policy gate using the prepared projection
197    // strategy as the single runtime source.
198    fn execute_prepared_projection_terminal_output(
199        &self,
200        strategy: PreparedFluentProjectionStrategy,
201    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
202    where
203        E: EntityValue,
204    {
205        let (target_field, runtime_request) = strategy.into_runtime_parts();
206
207        self.execute_scalar_non_paged_terminal(move |load, plan| {
208            load.execute_scalar_projection_boundary(
209                plan,
210                target_field,
211                projection_boundary_request_from_prepared(runtime_request),
212            )
213        })
214    }
215
216    // Apply one shared text projection to a terminal values payload while
217    // preserving row order and cardinality.
218    fn project_terminal_values(
219        projection: &TextProjectionExpr,
220        values: Vec<Value>,
221    ) -> Result<Vec<Value>, QueryError> {
222        values
223            .into_iter()
224            .map(|value| projection.apply_value(value))
225            .collect()
226    }
227
228    // Apply one shared text projection to an optional terminal value.
229    fn project_terminal_optional_value(
230        projection: &TextProjectionExpr,
231        value: Option<Value>,
232    ) -> Result<Option<Value>, QueryError> {
233        value.map(|value| projection.apply_value(value)).transpose()
234    }
235
236    // Apply one shared text projection to `(id, value)` terminal output while
237    // preserving identifier alignment.
238    fn project_terminal_values_with_ids(
239        projection: &TextProjectionExpr,
240        values: Vec<(Id<E>, Value)>,
241    ) -> Result<Vec<(Id<E>, Value)>, QueryError> {
242        values
243            .into_iter()
244            .map(|(id, value)| Ok((id, projection.apply_value(value)?)))
245            .collect()
246    }
247
248    // ------------------------------------------------------------------
249    // Execution terminals — semantic only
250    // ------------------------------------------------------------------
251
252    /// Execute and return whether the result set is empty.
253    pub fn is_empty(&self) -> Result<bool, QueryError>
254    where
255        E: EntityValue,
256    {
257        self.not_exists()
258    }
259
260    /// Execute and return whether no matching row exists.
261    pub fn not_exists(&self) -> Result<bool, QueryError>
262    where
263        E: EntityValue,
264    {
265        Ok(!self.exists()?)
266    }
267
268    /// Execute and return whether at least one matching row exists.
269    pub fn exists(&self) -> Result<bool, QueryError>
270    where
271        E: EntityValue,
272    {
273        self.execute_prepared_existing_rows_terminal_output(
274            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
275        )?
276        .into_exists()
277        .map_err(QueryError::execute)
278    }
279
280    /// Explain scalar `exists()` routing without executing the terminal.
281    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
282    where
283        E: EntityValue,
284    {
285        self.explain_prepared_aggregate_non_paged_terminal(
286            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
287        )
288    }
289
290    /// Explain scalar `not_exists()` routing without executing the terminal.
291    ///
292    /// This remains an `exists()` execution plan with negated boolean semantics.
293    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
294    where
295        E: EntityValue,
296    {
297        self.explain_exists()
298    }
299
300    /// Explain scalar load execution shape without executing the query.
301    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
302    where
303        E: EntityValue,
304    {
305        self.session
306            .explain_query_execution_with_visible_indexes(self.query())
307    }
308
309    /// Explain scalar load execution shape as deterministic text.
310    pub fn explain_execution_text(&self) -> Result<String, QueryError>
311    where
312        E: EntityValue,
313    {
314        self.session
315            .explain_query_execution_text_with_visible_indexes(self.query())
316    }
317
318    /// Explain scalar load execution shape as canonical JSON.
319    pub fn explain_execution_json(&self) -> Result<String, QueryError>
320    where
321        E: EntityValue,
322    {
323        self.session
324            .explain_query_execution_json_with_visible_indexes(self.query())
325    }
326
327    /// Explain scalar load execution shape as verbose text with diagnostics.
328    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
329    where
330        E: EntityValue,
331    {
332        self.session
333            .explain_query_execution_verbose_with_visible_indexes(self.query())
334    }
335
336    /// Execute and return the number of matching rows.
337    pub fn count(&self) -> Result<u32, QueryError>
338    where
339        E: EntityValue,
340    {
341        self.execute_prepared_existing_rows_terminal_output(
342            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
343        )?
344        .into_count()
345        .map_err(QueryError::execute)
346    }
347
348    /// Execute and return the total persisted payload bytes for the effective
349    /// result window.
350    pub fn bytes(&self) -> Result<u64, QueryError>
351    where
352        E: EntityValue,
353    {
354        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
355    }
356
357    /// Execute and return the total serialized bytes for `field` over the
358    /// effective result window.
359    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
360    where
361        E: EntityValue,
362    {
363        self.ensure_non_paged_mode_ready()?;
364
365        Self::with_slot(field, |target_slot| {
366            self.session
367                .execute_load_query_with(self.query(), move |load, plan| {
368                    load.bytes_by_slot(plan, target_slot)
369                })
370        })
371    }
372
373    /// Explain `bytes_by(field)` routing without executing the terminal.
374    pub fn explain_bytes_by(
375        &self,
376        field: impl AsRef<str>,
377    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
378    where
379        E: EntityValue,
380    {
381        self.ensure_non_paged_mode_ready()?;
382
383        Self::with_slot(field, |target_slot| {
384            self.session
385                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
386        })
387    }
388
389    /// Execute and return the smallest matching identifier, if any.
390    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
391    where
392        E: EntityValue,
393    {
394        self.execute_prepared_scalar_terminal_output(
395            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
396        )?
397        .into_id()
398        .map_err(QueryError::execute)
399    }
400
401    /// Explain scalar `min()` routing without executing the terminal.
402    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
403    where
404        E: EntityValue,
405    {
406        self.explain_prepared_aggregate_non_paged_terminal(
407            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
408        )
409    }
410
411    /// Execute and return the id of the row with the smallest value for `field`.
412    ///
413    /// Ties are deterministic: equal field values resolve by primary key ascending.
414    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
415    where
416        E: EntityValue,
417    {
418        self.ensure_non_paged_mode_ready()?;
419
420        Self::with_slot(field, |target_slot| {
421            self.execute_prepared_scalar_terminal_output(
422                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
423            )?
424            .into_id()
425            .map_err(QueryError::execute)
426        })
427    }
428
429    /// Execute and return the largest matching identifier, if any.
430    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
431    where
432        E: EntityValue,
433    {
434        self.execute_prepared_scalar_terminal_output(
435            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
436        )?
437        .into_id()
438        .map_err(QueryError::execute)
439    }
440
441    /// Explain scalar `max()` routing without executing the terminal.
442    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
443    where
444        E: EntityValue,
445    {
446        self.explain_prepared_aggregate_non_paged_terminal(
447            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
448        )
449    }
450
451    /// Execute and return the id of the row with the largest value for `field`.
452    ///
453    /// Ties are deterministic: equal field values resolve by primary key ascending.
454    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
455    where
456        E: EntityValue,
457    {
458        self.ensure_non_paged_mode_ready()?;
459
460        Self::with_slot(field, |target_slot| {
461            self.execute_prepared_scalar_terminal_output(
462                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
463            )?
464            .into_id()
465            .map_err(QueryError::execute)
466        })
467    }
468
469    /// Execute and return the id at zero-based ordinal `nth` when rows are
470    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
471    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
472    where
473        E: EntityValue,
474    {
475        self.ensure_non_paged_mode_ready()?;
476
477        Self::with_slot(field, |target_slot| {
478            self.execute_prepared_order_sensitive_terminal_output(
479                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
480            )?
481            .into_id()
482            .map_err(QueryError::execute)
483        })
484    }
485
486    /// Execute and return the sum of `field` over matching rows.
487    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
488    where
489        E: EntityValue,
490    {
491        self.ensure_non_paged_mode_ready()?;
492
493        Self::with_slot(field, |target_slot| {
494            self.execute_prepared_numeric_field_terminal(
495                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
496            )
497        })
498    }
499
500    /// Explain scalar `sum_by(field)` routing without executing the terminal.
501    pub fn explain_sum_by(
502        &self,
503        field: impl AsRef<str>,
504    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
505    where
506        E: EntityValue,
507    {
508        self.ensure_non_paged_mode_ready()?;
509
510        Self::with_slot(field, |target_slot| {
511            self.explain_prepared_aggregate_non_paged_terminal(
512                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
513            )
514        })
515    }
516
517    /// Execute and return the sum of distinct `field` values.
518    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
519    where
520        E: EntityValue,
521    {
522        self.ensure_non_paged_mode_ready()?;
523
524        Self::with_slot(field, |target_slot| {
525            self.execute_prepared_numeric_field_terminal(
526                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
527            )
528        })
529    }
530
531    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
532    pub fn explain_sum_distinct_by(
533        &self,
534        field: impl AsRef<str>,
535    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
536    where
537        E: EntityValue,
538    {
539        self.ensure_non_paged_mode_ready()?;
540
541        Self::with_slot(field, |target_slot| {
542            self.explain_prepared_aggregate_non_paged_terminal(
543                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
544            )
545        })
546    }
547
548    /// Execute and return the average of `field` over matching rows.
549    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
550    where
551        E: EntityValue,
552    {
553        self.ensure_non_paged_mode_ready()?;
554
555        Self::with_slot(field, |target_slot| {
556            self.execute_prepared_numeric_field_terminal(
557                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
558            )
559        })
560    }
561
562    /// Explain scalar `avg_by(field)` routing without executing the terminal.
563    pub fn explain_avg_by(
564        &self,
565        field: impl AsRef<str>,
566    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
567    where
568        E: EntityValue,
569    {
570        self.ensure_non_paged_mode_ready()?;
571
572        Self::with_slot(field, |target_slot| {
573            self.explain_prepared_aggregate_non_paged_terminal(
574                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
575            )
576        })
577    }
578
579    /// Execute and return the average of distinct `field` values.
580    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
581    where
582        E: EntityValue,
583    {
584        self.ensure_non_paged_mode_ready()?;
585
586        Self::with_slot(field, |target_slot| {
587            self.execute_prepared_numeric_field_terminal(
588                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
589            )
590        })
591    }
592
593    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
594    pub fn explain_avg_distinct_by(
595        &self,
596        field: impl AsRef<str>,
597    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
598    where
599        E: EntityValue,
600    {
601        self.ensure_non_paged_mode_ready()?;
602
603        Self::with_slot(field, |target_slot| {
604            self.explain_prepared_aggregate_non_paged_terminal(
605                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
606            )
607        })
608    }
609
610    /// Execute and return the median id by `field` using deterministic ordering
611    /// `(field asc, primary key asc)`.
612    ///
613    /// Even-length windows select the lower median.
614    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
615    where
616        E: EntityValue,
617    {
618        self.ensure_non_paged_mode_ready()?;
619
620        Self::with_slot(field, |target_slot| {
621            self.execute_prepared_order_sensitive_terminal_output(
622                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
623            )?
624            .into_id()
625            .map_err(QueryError::execute)
626        })
627    }
628
629    /// Execute and return the number of distinct values for `field` over the
630    /// effective result window.
631    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
632    where
633        E: EntityValue,
634    {
635        self.ensure_non_paged_mode_ready()?;
636
637        Self::with_slot(field, |target_slot| {
638            self.execute_prepared_projection_terminal_output(
639                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
640            )?
641            .into_count()
642            .map_err(QueryError::execute)
643        })
644    }
645
646    /// Explain `count_distinct_by(field)` routing without executing the terminal.
647    pub fn explain_count_distinct_by(
648        &self,
649        field: impl AsRef<str>,
650    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
651    where
652        E: EntityValue,
653    {
654        self.ensure_non_paged_mode_ready()?;
655
656        Self::with_slot(field, |target_slot| {
657            self.explain_prepared_projection_non_paged_terminal(
658                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
659            )
660        })
661    }
662
663    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
664    ///
665    /// Tie handling is deterministic for both extrema: primary key ascending.
666    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
667    where
668        E: EntityValue,
669    {
670        self.ensure_non_paged_mode_ready()?;
671
672        Self::with_slot(field, |target_slot| {
673            self.execute_prepared_order_sensitive_terminal_output(
674                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
675            )?
676            .into_id_pair()
677            .map_err(QueryError::execute)
678        })
679    }
680
681    /// Execute and return projected field values for the effective result window.
682    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
683    where
684        E: EntityValue,
685    {
686        self.ensure_non_paged_mode_ready()?;
687
688        Self::with_slot(field, |target_slot| {
689            self.execute_prepared_projection_terminal_output(
690                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
691            )?
692            .into_values()
693            .map_err(QueryError::execute)
694        })
695    }
696
697    /// Execute and return projected values for one shared text projection over
698    /// the effective response window.
699    pub fn project_values(&self, projection: &TextProjectionExpr) -> Result<Vec<Value>, QueryError>
700    where
701        E: EntityValue,
702    {
703        self.ensure_non_paged_mode_ready()?;
704
705        Self::with_slot(projection.field(), |target_slot| {
706            let values = self
707                .execute_prepared_projection_terminal_output(
708                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
709                )?
710                .into_values()
711                .map_err(QueryError::execute)?;
712
713            Self::project_terminal_values(projection, values)
714        })
715    }
716
717    /// Explain `project_values(projection)` routing without executing it.
718    pub fn explain_project_values(
719        &self,
720        projection: &TextProjectionExpr,
721    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
722    where
723        E: EntityValue,
724    {
725        self.ensure_non_paged_mode_ready()?;
726
727        Self::with_slot(projection.field(), |target_slot| {
728            self.explain_prepared_projection_non_paged_terminal(
729                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
730            )
731        })
732    }
733
734    /// Explain `values_by(field)` routing without executing the terminal.
735    pub fn explain_values_by(
736        &self,
737        field: impl AsRef<str>,
738    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
739    where
740        E: EntityValue,
741    {
742        self.ensure_non_paged_mode_ready()?;
743
744        Self::with_slot(field, |target_slot| {
745            self.explain_prepared_projection_non_paged_terminal(
746                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
747            )
748        })
749    }
750
751    /// Execute and return the first `k` rows from the effective response window.
752    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
753    where
754        E: EntityValue,
755    {
756        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
757    }
758
759    /// Execute and return the top `k` rows by `field` under deterministic
760    /// ordering `(field desc, primary_key asc)` over the effective response
761    /// window.
762    ///
763    /// This terminal applies its own ordering and does not preserve query
764    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
765    /// matches `max_by(field)` selection semantics.
766    pub fn top_k_by(
767        &self,
768        field: impl AsRef<str>,
769        take_count: u32,
770    ) -> Result<EntityResponse<E>, QueryError>
771    where
772        E: EntityValue,
773    {
774        self.ensure_non_paged_mode_ready()?;
775
776        Self::with_slot(field, |target_slot| {
777            self.session
778                .execute_load_query_with(self.query(), move |load, plan| {
779                    load.top_k_by_slot(plan, target_slot, take_count)
780                })
781        })
782    }
783
784    /// Execute and return the bottom `k` rows by `field` under deterministic
785    /// ordering `(field asc, primary_key asc)` over the effective response
786    /// window.
787    ///
788    /// This terminal applies its own ordering and does not preserve query
789    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
790    /// matches `min_by(field)` selection semantics.
791    pub fn bottom_k_by(
792        &self,
793        field: impl AsRef<str>,
794        take_count: u32,
795    ) -> Result<EntityResponse<E>, QueryError>
796    where
797        E: EntityValue,
798    {
799        self.ensure_non_paged_mode_ready()?;
800
801        Self::with_slot(field, |target_slot| {
802            self.session
803                .execute_load_query_with(self.query(), move |load, plan| {
804                    load.bottom_k_by_slot(plan, target_slot, take_count)
805                })
806        })
807    }
808
809    /// Execute and return projected values for the top `k` rows by `field`
810    /// under deterministic ordering `(field desc, primary_key asc)` over the
811    /// effective response window.
812    ///
813    /// Ranking is applied before projection and does not preserve query
814    /// `order_by(...)` row order in the returned values. For `k = 1`, this
815    /// matches `max_by(field)` projected to one value.
816    pub fn top_k_by_values(
817        &self,
818        field: impl AsRef<str>,
819        take_count: u32,
820    ) -> Result<Vec<Value>, QueryError>
821    where
822        E: EntityValue,
823    {
824        self.ensure_non_paged_mode_ready()?;
825
826        Self::with_slot(field, |target_slot| {
827            self.session
828                .execute_load_query_with(self.query(), move |load, plan| {
829                    load.top_k_by_values_slot(plan, target_slot, take_count)
830                })
831        })
832    }
833
834    /// Execute and return projected values for the bottom `k` rows by `field`
835    /// under deterministic ordering `(field asc, primary_key asc)` over the
836    /// effective response window.
837    ///
838    /// Ranking is applied before projection and does not preserve query
839    /// `order_by(...)` row order in the returned values. For `k = 1`, this
840    /// matches `min_by(field)` projected to one value.
841    pub fn bottom_k_by_values(
842        &self,
843        field: impl AsRef<str>,
844        take_count: u32,
845    ) -> Result<Vec<Value>, QueryError>
846    where
847        E: EntityValue,
848    {
849        self.ensure_non_paged_mode_ready()?;
850
851        Self::with_slot(field, |target_slot| {
852            self.session
853                .execute_load_query_with(self.query(), move |load, plan| {
854                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
855                })
856        })
857    }
858
859    /// Execute and return projected id/value pairs for the top `k` rows by
860    /// `field` under deterministic ordering `(field desc, primary_key asc)`
861    /// over the effective response window.
862    ///
863    /// Ranking is applied before projection and does not preserve query
864    /// `order_by(...)` row order in the returned values. For `k = 1`, this
865    /// matches `max_by(field)` projected to one `(id, value)` pair.
866    pub fn top_k_by_with_ids(
867        &self,
868        field: impl AsRef<str>,
869        take_count: u32,
870    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
871    where
872        E: EntityValue,
873    {
874        self.ensure_non_paged_mode_ready()?;
875
876        Self::with_slot(field, |target_slot| {
877            self.session
878                .execute_load_query_with(self.query(), move |load, plan| {
879                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
880                })
881        })
882    }
883
884    /// Execute and return projected id/value pairs for the bottom `k` rows by
885    /// `field` under deterministic ordering `(field asc, primary_key asc)`
886    /// over the effective response window.
887    ///
888    /// Ranking is applied before projection and does not preserve query
889    /// `order_by(...)` row order in the returned values. For `k = 1`, this
890    /// matches `min_by(field)` projected to one `(id, value)` pair.
891    pub fn bottom_k_by_with_ids(
892        &self,
893        field: impl AsRef<str>,
894        take_count: u32,
895    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
896    where
897        E: EntityValue,
898    {
899        self.ensure_non_paged_mode_ready()?;
900
901        Self::with_slot(field, |target_slot| {
902            self.session
903                .execute_load_query_with(self.query(), move |load, plan| {
904                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
905                })
906        })
907    }
908
909    /// Execute and return distinct projected field values for the effective
910    /// result window, preserving first-observed value order.
911    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
912    where
913        E: EntityValue,
914    {
915        self.ensure_non_paged_mode_ready()?;
916
917        Self::with_slot(field, |target_slot| {
918            self.execute_prepared_projection_terminal_output(
919                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
920            )?
921            .into_values()
922            .map_err(QueryError::execute)
923        })
924    }
925
926    /// Explain `distinct_values_by(field)` routing without executing the terminal.
927    pub fn explain_distinct_values_by(
928        &self,
929        field: impl AsRef<str>,
930    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
931    where
932        E: EntityValue,
933    {
934        self.ensure_non_paged_mode_ready()?;
935
936        Self::with_slot(field, |target_slot| {
937            self.explain_prepared_projection_non_paged_terminal(
938                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
939            )
940        })
941    }
942
943    /// Execute and return projected field values paired with row ids for the
944    /// effective result window.
945    pub fn values_by_with_ids(
946        &self,
947        field: impl AsRef<str>,
948    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
949    where
950        E: EntityValue,
951    {
952        self.ensure_non_paged_mode_ready()?;
953
954        Self::with_slot(field, |target_slot| {
955            self.execute_prepared_projection_terminal_output(
956                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
957            )?
958            .into_values_with_ids()
959            .map_err(QueryError::execute)
960        })
961    }
962
963    /// Execute and return projected id/value pairs for one shared text
964    /// projection over the effective response window.
965    pub fn project_values_with_ids(
966        &self,
967        projection: &TextProjectionExpr,
968    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
969    where
970        E: EntityValue,
971    {
972        self.ensure_non_paged_mode_ready()?;
973
974        Self::with_slot(projection.field(), |target_slot| {
975            let values = self
976                .execute_prepared_projection_terminal_output(
977                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
978                )?
979                .into_values_with_ids::<E>()
980                .map_err(QueryError::execute)?;
981
982            Self::project_terminal_values_with_ids(projection, values)
983        })
984    }
985
986    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
987    pub fn explain_values_by_with_ids(
988        &self,
989        field: impl AsRef<str>,
990    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
991    where
992        E: EntityValue,
993    {
994        self.ensure_non_paged_mode_ready()?;
995
996        Self::with_slot(field, |target_slot| {
997            self.explain_prepared_projection_non_paged_terminal(
998                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
999            )
1000        })
1001    }
1002
1003    /// Execute and return the first projected field value in effective response
1004    /// order, if any.
1005    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1006    where
1007        E: EntityValue,
1008    {
1009        self.ensure_non_paged_mode_ready()?;
1010
1011        Self::with_slot(field, |target_slot| {
1012            self.execute_prepared_projection_terminal_output(
1013                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1014            )?
1015            .into_terminal_value()
1016            .map_err(QueryError::execute)
1017        })
1018    }
1019
1020    /// Execute and return the first projected value for one shared text
1021    /// projection in effective response order, if any.
1022    pub fn project_first_value(
1023        &self,
1024        projection: &TextProjectionExpr,
1025    ) -> Result<Option<Value>, QueryError>
1026    where
1027        E: EntityValue,
1028    {
1029        self.ensure_non_paged_mode_ready()?;
1030
1031        Self::with_slot(projection.field(), |target_slot| {
1032            let value = self
1033                .execute_prepared_projection_terminal_output(
1034                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1035                )?
1036                .into_terminal_value()
1037                .map_err(QueryError::execute)?;
1038
1039            Self::project_terminal_optional_value(projection, value)
1040        })
1041    }
1042
1043    /// Explain `first_value_by(field)` routing without executing the terminal.
1044    pub fn explain_first_value_by(
1045        &self,
1046        field: impl AsRef<str>,
1047    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1048    where
1049        E: EntityValue,
1050    {
1051        self.ensure_non_paged_mode_ready()?;
1052
1053        Self::with_slot(field, |target_slot| {
1054            self.explain_prepared_projection_non_paged_terminal(
1055                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1056            )
1057        })
1058    }
1059
1060    /// Execute and return the last projected field value in effective response
1061    /// order, if any.
1062    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1063    where
1064        E: EntityValue,
1065    {
1066        self.ensure_non_paged_mode_ready()?;
1067
1068        Self::with_slot(field, |target_slot| {
1069            self.execute_prepared_projection_terminal_output(
1070                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1071            )?
1072            .into_terminal_value()
1073            .map_err(QueryError::execute)
1074        })
1075    }
1076
1077    /// Execute and return the last projected value for one shared text
1078    /// projection in effective response order, if any.
1079    pub fn project_last_value(
1080        &self,
1081        projection: &TextProjectionExpr,
1082    ) -> Result<Option<Value>, QueryError>
1083    where
1084        E: EntityValue,
1085    {
1086        self.ensure_non_paged_mode_ready()?;
1087
1088        Self::with_slot(projection.field(), |target_slot| {
1089            let value = self
1090                .execute_prepared_projection_terminal_output(
1091                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1092                )?
1093                .into_terminal_value()
1094                .map_err(QueryError::execute)?;
1095
1096            Self::project_terminal_optional_value(projection, value)
1097        })
1098    }
1099
1100    /// Explain `last_value_by(field)` routing without executing the terminal.
1101    pub fn explain_last_value_by(
1102        &self,
1103        field: impl AsRef<str>,
1104    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1105    where
1106        E: EntityValue,
1107    {
1108        self.ensure_non_paged_mode_ready()?;
1109
1110        Self::with_slot(field, |target_slot| {
1111            self.explain_prepared_projection_non_paged_terminal(
1112                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1113            )
1114        })
1115    }
1116
1117    /// Execute and return the first matching identifier in response order, if any.
1118    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1119    where
1120        E: EntityValue,
1121    {
1122        self.execute_prepared_order_sensitive_terminal_output(
1123            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1124        )?
1125        .into_id()
1126        .map_err(QueryError::execute)
1127    }
1128
1129    /// Explain scalar `first()` routing without executing the terminal.
1130    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1131    where
1132        E: EntityValue,
1133    {
1134        self.explain_prepared_aggregate_non_paged_terminal(
1135            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1136        )
1137    }
1138
1139    /// Execute and return the last matching identifier in response order, if any.
1140    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1141    where
1142        E: EntityValue,
1143    {
1144        self.execute_prepared_order_sensitive_terminal_output(
1145            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1146        )?
1147        .into_id()
1148        .map_err(QueryError::execute)
1149    }
1150
1151    /// Explain scalar `last()` routing without executing the terminal.
1152    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1153    where
1154        E: EntityValue,
1155    {
1156        self.explain_prepared_aggregate_non_paged_terminal(
1157            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1158        )
1159    }
1160
1161    /// Execute and require exactly one matching row.
1162    pub fn require_one(&self) -> Result<(), QueryError>
1163    where
1164        E: EntityValue,
1165    {
1166        self.execute()?.into_rows()?.require_one()?;
1167        Ok(())
1168    }
1169
1170    /// Execute and require at least one matching row.
1171    pub fn require_some(&self) -> Result<(), QueryError>
1172    where
1173        E: EntityValue,
1174    {
1175        self.execute()?.into_rows()?.require_some()?;
1176        Ok(())
1177    }
1178}
1179
1180fn scalar_terminal_boundary_request_from_prepared(
1181    request: PreparedFluentScalarTerminalRuntimeRequest,
1182) -> ScalarTerminalBoundaryRequest {
1183    match request {
1184        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1185            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1186        }
1187        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1188            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1189        }
1190    }
1191}
1192
1193const fn existing_rows_terminal_boundary_request_from_prepared(
1194    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1195) -> ScalarTerminalBoundaryRequest {
1196    match request {
1197        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1198            ScalarTerminalBoundaryRequest::Count
1199        }
1200        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1201            ScalarTerminalBoundaryRequest::Exists
1202        }
1203    }
1204}
1205
1206const fn numeric_field_boundary_request_from_prepared(
1207    request: PreparedFluentNumericFieldRuntimeRequest,
1208) -> ScalarNumericFieldBoundaryRequest {
1209    match request {
1210        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1211        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1212            ScalarNumericFieldBoundaryRequest::SumDistinct
1213        }
1214        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1215        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1216            ScalarNumericFieldBoundaryRequest::AvgDistinct
1217        }
1218    }
1219}
1220
1221fn order_sensitive_terminal_boundary_request_from_prepared(
1222    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1223) -> ScalarTerminalBoundaryRequest {
1224    match request {
1225        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1226            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1227        }
1228        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1229            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1230        }
1231        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1232            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1233        }
1234        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1235            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1236        }
1237    }
1238}
1239
1240const fn projection_boundary_request_from_prepared(
1241    request: PreparedFluentProjectionRuntimeRequest,
1242) -> ScalarProjectionBoundaryRequest {
1243    match request {
1244        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1245        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1246            ScalarProjectionBoundaryRequest::DistinctValues
1247        }
1248        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1249            ScalarProjectionBoundaryRequest::CountDistinct
1250        }
1251        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1252            ScalarProjectionBoundaryRequest::ValuesWithIds
1253        }
1254        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1255            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1256        }
1257    }
1258}