Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                ValueProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        if self.query().has_grouping() {
58            return self
59                .session
60                .execute_grouped(self.query(), self.cursor_token.as_deref())
61                .map(LoadQueryResult::Grouped);
62        }
63
64        self.session
65            .execute_query(self.query())
66            .map(LoadQueryResult::Rows)
67    }
68
69    // Run one scalar terminal through the canonical non-paged fluent policy
70    // gate before handing execution to the session load-query adapter.
71    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72    where
73        E: EntityValue,
74        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75    {
76        self.ensure_non_paged_mode_ready()?;
77
78        self.session.execute_load_query_with(self.query(), execute)
79    }
80
81    // Run one explain-visible aggregate terminal through the canonical
82    // non-paged fluent policy gate using the prepared aggregate strategy as
83    // the single explain projection source.
84    fn explain_prepared_aggregate_non_paged_terminal<S>(
85        &self,
86        strategy: &S,
87    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88    where
89        E: EntityValue,
90        S: PreparedFluentAggregateExplainStrategy,
91    {
92        self.ensure_non_paged_mode_ready()?;
93
94        self.session
95            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96    }
97
98    // Run one prepared projection/distinct explain terminal through the
99    // canonical non-paged fluent policy gate using the prepared projection
100    // strategy as the single explain projection source.
101    fn explain_prepared_projection_non_paged_terminal(
102        &self,
103        strategy: &PreparedFluentProjectionStrategy,
104    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105    where
106        E: EntityValue,
107    {
108        self.ensure_non_paged_mode_ready()?;
109
110        self.session
111            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112    }
113
114    // Execute one prepared fluent scalar terminal through the canonical
115    // non-paged fluent policy gate using the prepared runtime request as the
116    // single execution source.
117    fn execute_prepared_scalar_terminal_output(
118        &self,
119        strategy: PreparedFluentScalarTerminalStrategy,
120    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
121    where
122        E: EntityValue,
123    {
124        let runtime_request = strategy.into_runtime_request();
125
126        self.execute_scalar_non_paged_terminal(move |load, plan| {
127            load.execute_scalar_terminal_request(
128                plan,
129                scalar_terminal_boundary_request_from_prepared(runtime_request),
130            )
131        })
132    }
133
134    // Execute one prepared fluent existing-rows terminal through the
135    // canonical non-paged fluent policy gate using the prepared existing-rows
136    // strategy as the single runtime source.
137    fn execute_prepared_existing_rows_terminal_output(
138        &self,
139        strategy: PreparedFluentExistingRowsTerminalStrategy,
140    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
141    where
142        E: EntityValue,
143    {
144        let runtime_request = strategy.into_runtime_request();
145
146        self.execute_scalar_non_paged_terminal(move |load, plan| {
147            load.execute_scalar_terminal_request(
148                plan,
149                existing_rows_terminal_boundary_request_from_prepared(runtime_request),
150            )
151        })
152    }
153
154    // Execute one prepared fluent numeric-field terminal through the canonical
155    // non-paged fluent policy gate using the prepared numeric strategy as the
156    // single runtime source.
157    fn execute_prepared_numeric_field_terminal(
158        &self,
159        strategy: PreparedFluentNumericFieldStrategy,
160    ) -> Result<Option<Decimal>, QueryError>
161    where
162        E: EntityValue,
163    {
164        let (target_field, runtime_request) = strategy.into_runtime_parts();
165
166        self.execute_scalar_non_paged_terminal(move |load, plan| {
167            load.execute_numeric_field_boundary(
168                plan,
169                target_field,
170                numeric_field_boundary_request_from_prepared(runtime_request),
171            )
172        })
173    }
174
175    // Execute one prepared fluent order-sensitive terminal through the
176    // canonical non-paged fluent policy gate using the prepared order-sensitive
177    // strategy as the single runtime source.
178    fn execute_prepared_order_sensitive_terminal_output(
179        &self,
180        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
181    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
182    where
183        E: EntityValue,
184    {
185        let runtime_request = strategy.into_runtime_request();
186
187        self.execute_scalar_non_paged_terminal(move |load, plan| {
188            load.execute_scalar_terminal_request(
189                plan,
190                order_sensitive_terminal_boundary_request_from_prepared(runtime_request),
191            )
192        })
193    }
194
195    // Execute one prepared fluent projection/distinct terminal through the
196    // canonical non-paged fluent policy gate using the prepared projection
197    // strategy as the single runtime source.
198    fn execute_prepared_projection_terminal_output(
199        &self,
200        strategy: PreparedFluentProjectionStrategy,
201    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
202    where
203        E: EntityValue,
204    {
205        let (target_field, runtime_request) = strategy.into_runtime_parts();
206
207        self.execute_scalar_non_paged_terminal(move |load, plan| {
208            load.execute_scalar_projection_boundary(
209                plan,
210                target_field,
211                projection_boundary_request_from_prepared(runtime_request),
212            )
213        })
214    }
215
216    // Apply one shared bounded value projection to iterator-like terminal
217    // output while preserving source order and cardinality.
218    fn project_terminal_items<P, T, U>(
219        projection: &P,
220        values: impl IntoIterator<Item = T>,
221        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
222    ) -> Result<Vec<U>, QueryError>
223    where
224        P: ValueProjectionExpr,
225    {
226        values
227            .into_iter()
228            .map(|value| map(projection, value))
229            .collect()
230    }
231
232    // ------------------------------------------------------------------
233    // Execution terminals — semantic only
234    // ------------------------------------------------------------------
235
236    /// Execute and return whether the result set is empty.
237    pub fn is_empty(&self) -> Result<bool, QueryError>
238    where
239        E: EntityValue,
240    {
241        self.not_exists()
242    }
243
244    /// Execute and return whether no matching row exists.
245    pub fn not_exists(&self) -> Result<bool, QueryError>
246    where
247        E: EntityValue,
248    {
249        Ok(!self.exists()?)
250    }
251
252    /// Execute and return whether at least one matching row exists.
253    pub fn exists(&self) -> Result<bool, QueryError>
254    where
255        E: EntityValue,
256    {
257        self.execute_prepared_existing_rows_terminal_output(
258            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
259        )?
260        .into_exists()
261        .map_err(QueryError::execute)
262    }
263
264    /// Explain scalar `exists()` routing without executing the terminal.
265    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
266    where
267        E: EntityValue,
268    {
269        self.explain_prepared_aggregate_non_paged_terminal(
270            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
271        )
272    }
273
274    /// Explain scalar `not_exists()` routing without executing the terminal.
275    ///
276    /// This remains an `exists()` execution plan with negated boolean semantics.
277    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.explain_exists()
282    }
283
284    /// Explain scalar load execution shape without executing the query.
285    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
286    where
287        E: EntityValue,
288    {
289        self.session
290            .explain_query_execution_with_visible_indexes(self.query())
291    }
292
293    /// Explain scalar load execution shape as deterministic text.
294    pub fn explain_execution_text(&self) -> Result<String, QueryError>
295    where
296        E: EntityValue,
297    {
298        self.session
299            .explain_query_execution_text_with_visible_indexes(self.query())
300    }
301
302    /// Explain scalar load execution shape as canonical JSON.
303    pub fn explain_execution_json(&self) -> Result<String, QueryError>
304    where
305        E: EntityValue,
306    {
307        self.session
308            .explain_query_execution_json_with_visible_indexes(self.query())
309    }
310
311    /// Explain scalar load execution shape as verbose text with diagnostics.
312    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
313    where
314        E: EntityValue,
315    {
316        self.session
317            .explain_query_execution_verbose_with_visible_indexes(self.query())
318    }
319
320    /// Execute and return the number of matching rows.
321    pub fn count(&self) -> Result<u32, QueryError>
322    where
323        E: EntityValue,
324    {
325        self.execute_prepared_existing_rows_terminal_output(
326            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
327        )?
328        .into_count()
329        .map_err(QueryError::execute)
330    }
331
332    /// Execute and return the total persisted payload bytes for the effective
333    /// result window.
334    pub fn bytes(&self) -> Result<u64, QueryError>
335    where
336        E: EntityValue,
337    {
338        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
339    }
340
341    /// Execute and return the total serialized bytes for `field` over the
342    /// effective result window.
343    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
344    where
345        E: EntityValue,
346    {
347        self.ensure_non_paged_mode_ready()?;
348
349        Self::with_slot(field, |target_slot| {
350            self.session
351                .execute_load_query_with(self.query(), move |load, plan| {
352                    load.bytes_by_slot(plan, target_slot)
353                })
354        })
355    }
356
357    /// Explain `bytes_by(field)` routing without executing the terminal.
358    pub fn explain_bytes_by(
359        &self,
360        field: impl AsRef<str>,
361    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
362    where
363        E: EntityValue,
364    {
365        self.ensure_non_paged_mode_ready()?;
366
367        Self::with_slot(field, |target_slot| {
368            self.session
369                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
370        })
371    }
372
373    /// Execute and return the smallest matching identifier, if any.
374    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.execute_prepared_scalar_terminal_output(
379            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
380        )?
381        .into_id()
382        .map_err(QueryError::execute)
383    }
384
385    /// Explain scalar `min()` routing without executing the terminal.
386    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
387    where
388        E: EntityValue,
389    {
390        self.explain_prepared_aggregate_non_paged_terminal(
391            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
392        )
393    }
394
395    /// Execute and return the id of the row with the smallest value for `field`.
396    ///
397    /// Ties are deterministic: equal field values resolve by primary key ascending.
398    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
399    where
400        E: EntityValue,
401    {
402        self.ensure_non_paged_mode_ready()?;
403
404        Self::with_slot(field, |target_slot| {
405            self.execute_prepared_scalar_terminal_output(
406                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
407            )?
408            .into_id()
409            .map_err(QueryError::execute)
410        })
411    }
412
413    /// Execute and return the largest matching identifier, if any.
414    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
415    where
416        E: EntityValue,
417    {
418        self.execute_prepared_scalar_terminal_output(
419            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
420        )?
421        .into_id()
422        .map_err(QueryError::execute)
423    }
424
425    /// Explain scalar `max()` routing without executing the terminal.
426    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
427    where
428        E: EntityValue,
429    {
430        self.explain_prepared_aggregate_non_paged_terminal(
431            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
432        )
433    }
434
435    /// Execute and return the id of the row with the largest value for `field`.
436    ///
437    /// Ties are deterministic: equal field values resolve by primary key ascending.
438    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
439    where
440        E: EntityValue,
441    {
442        self.ensure_non_paged_mode_ready()?;
443
444        Self::with_slot(field, |target_slot| {
445            self.execute_prepared_scalar_terminal_output(
446                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
447            )?
448            .into_id()
449            .map_err(QueryError::execute)
450        })
451    }
452
453    /// Execute and return the id at zero-based ordinal `nth` when rows are
454    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
455    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
456    where
457        E: EntityValue,
458    {
459        self.ensure_non_paged_mode_ready()?;
460
461        Self::with_slot(field, |target_slot| {
462            self.execute_prepared_order_sensitive_terminal_output(
463                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
464            )?
465            .into_id()
466            .map_err(QueryError::execute)
467        })
468    }
469
470    /// Execute and return the sum of `field` over matching rows.
471    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
472    where
473        E: EntityValue,
474    {
475        self.ensure_non_paged_mode_ready()?;
476
477        Self::with_slot(field, |target_slot| {
478            self.execute_prepared_numeric_field_terminal(
479                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
480            )
481        })
482    }
483
484    /// Explain scalar `sum_by(field)` routing without executing the terminal.
485    pub fn explain_sum_by(
486        &self,
487        field: impl AsRef<str>,
488    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
489    where
490        E: EntityValue,
491    {
492        self.ensure_non_paged_mode_ready()?;
493
494        Self::with_slot(field, |target_slot| {
495            self.explain_prepared_aggregate_non_paged_terminal(
496                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
497            )
498        })
499    }
500
501    /// Execute and return the sum of distinct `field` values.
502    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
503    where
504        E: EntityValue,
505    {
506        self.ensure_non_paged_mode_ready()?;
507
508        Self::with_slot(field, |target_slot| {
509            self.execute_prepared_numeric_field_terminal(
510                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
511            )
512        })
513    }
514
515    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
516    pub fn explain_sum_distinct_by(
517        &self,
518        field: impl AsRef<str>,
519    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
520    where
521        E: EntityValue,
522    {
523        self.ensure_non_paged_mode_ready()?;
524
525        Self::with_slot(field, |target_slot| {
526            self.explain_prepared_aggregate_non_paged_terminal(
527                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
528            )
529        })
530    }
531
532    /// Execute and return the average of `field` over matching rows.
533    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
534    where
535        E: EntityValue,
536    {
537        self.ensure_non_paged_mode_ready()?;
538
539        Self::with_slot(field, |target_slot| {
540            self.execute_prepared_numeric_field_terminal(
541                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
542            )
543        })
544    }
545
546    /// Explain scalar `avg_by(field)` routing without executing the terminal.
547    pub fn explain_avg_by(
548        &self,
549        field: impl AsRef<str>,
550    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
551    where
552        E: EntityValue,
553    {
554        self.ensure_non_paged_mode_ready()?;
555
556        Self::with_slot(field, |target_slot| {
557            self.explain_prepared_aggregate_non_paged_terminal(
558                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
559            )
560        })
561    }
562
563    /// Execute and return the average of distinct `field` values.
564    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
565    where
566        E: EntityValue,
567    {
568        self.ensure_non_paged_mode_ready()?;
569
570        Self::with_slot(field, |target_slot| {
571            self.execute_prepared_numeric_field_terminal(
572                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
573            )
574        })
575    }
576
577    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
578    pub fn explain_avg_distinct_by(
579        &self,
580        field: impl AsRef<str>,
581    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
582    where
583        E: EntityValue,
584    {
585        self.ensure_non_paged_mode_ready()?;
586
587        Self::with_slot(field, |target_slot| {
588            self.explain_prepared_aggregate_non_paged_terminal(
589                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
590            )
591        })
592    }
593
594    /// Execute and return the median id by `field` using deterministic ordering
595    /// `(field asc, primary key asc)`.
596    ///
597    /// Even-length windows select the lower median.
598    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
599    where
600        E: EntityValue,
601    {
602        self.ensure_non_paged_mode_ready()?;
603
604        Self::with_slot(field, |target_slot| {
605            self.execute_prepared_order_sensitive_terminal_output(
606                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
607            )?
608            .into_id()
609            .map_err(QueryError::execute)
610        })
611    }
612
613    /// Execute and return the number of distinct values for `field` over the
614    /// effective result window.
615    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
616    where
617        E: EntityValue,
618    {
619        self.ensure_non_paged_mode_ready()?;
620
621        Self::with_slot(field, |target_slot| {
622            self.execute_prepared_projection_terminal_output(
623                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
624            )?
625            .into_count()
626            .map_err(QueryError::execute)
627        })
628    }
629
630    /// Explain `count_distinct_by(field)` routing without executing the terminal.
631    pub fn explain_count_distinct_by(
632        &self,
633        field: impl AsRef<str>,
634    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
635    where
636        E: EntityValue,
637    {
638        self.ensure_non_paged_mode_ready()?;
639
640        Self::with_slot(field, |target_slot| {
641            self.explain_prepared_projection_non_paged_terminal(
642                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
643            )
644        })
645    }
646
647    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
648    ///
649    /// Tie handling is deterministic for both extrema: primary key ascending.
650    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
651    where
652        E: EntityValue,
653    {
654        self.ensure_non_paged_mode_ready()?;
655
656        Self::with_slot(field, |target_slot| {
657            self.execute_prepared_order_sensitive_terminal_output(
658                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
659            )?
660            .into_id_pair()
661            .map_err(QueryError::execute)
662        })
663    }
664
665    /// Execute and return projected field values for the effective result window.
666    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
667    where
668        E: EntityValue,
669    {
670        self.ensure_non_paged_mode_ready()?;
671
672        Self::with_slot(field, |target_slot| {
673            self.execute_prepared_projection_terminal_output(
674                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
675            )?
676            .into_values()
677            .map_err(QueryError::execute)
678        })
679    }
680
681    /// Execute and return projected values for one shared bounded projection
682    /// over the effective response window.
683    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<Value>, QueryError>
684    where
685        E: EntityValue,
686        P: ValueProjectionExpr,
687    {
688        self.ensure_non_paged_mode_ready()?;
689
690        Self::with_slot(projection.field(), |target_slot| {
691            let values = self
692                .execute_prepared_projection_terminal_output(
693                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
694                )?
695                .into_values()
696                .map_err(QueryError::execute)?;
697
698            Self::project_terminal_items(projection, values, |projection, value| {
699                projection.apply_value(value)
700            })
701        })
702    }
703
704    /// Explain `project_values(projection)` routing without executing it.
705    pub fn explain_project_values<P>(
706        &self,
707        projection: &P,
708    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
709    where
710        E: EntityValue,
711        P: ValueProjectionExpr,
712    {
713        self.ensure_non_paged_mode_ready()?;
714
715        Self::with_slot(projection.field(), |target_slot| {
716            self.explain_prepared_projection_non_paged_terminal(
717                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
718            )
719        })
720    }
721
722    /// Explain `values_by(field)` routing without executing the terminal.
723    pub fn explain_values_by(
724        &self,
725        field: impl AsRef<str>,
726    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
727    where
728        E: EntityValue,
729    {
730        self.ensure_non_paged_mode_ready()?;
731
732        Self::with_slot(field, |target_slot| {
733            self.explain_prepared_projection_non_paged_terminal(
734                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
735            )
736        })
737    }
738
739    /// Execute and return the first `k` rows from the effective response window.
740    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
741    where
742        E: EntityValue,
743    {
744        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
745    }
746
747    /// Execute and return the top `k` rows by `field` under deterministic
748    /// ordering `(field desc, primary_key asc)` over the effective response
749    /// window.
750    ///
751    /// This terminal applies its own ordering and does not preserve query
752    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
753    /// matches `max_by(field)` selection semantics.
754    pub fn top_k_by(
755        &self,
756        field: impl AsRef<str>,
757        take_count: u32,
758    ) -> Result<EntityResponse<E>, QueryError>
759    where
760        E: EntityValue,
761    {
762        self.ensure_non_paged_mode_ready()?;
763
764        Self::with_slot(field, |target_slot| {
765            self.session
766                .execute_load_query_with(self.query(), move |load, plan| {
767                    load.top_k_by_slot(plan, target_slot, take_count)
768                })
769        })
770    }
771
772    /// Execute and return the bottom `k` rows by `field` under deterministic
773    /// ordering `(field asc, primary_key asc)` over the effective response
774    /// window.
775    ///
776    /// This terminal applies its own ordering and does not preserve query
777    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
778    /// matches `min_by(field)` selection semantics.
779    pub fn bottom_k_by(
780        &self,
781        field: impl AsRef<str>,
782        take_count: u32,
783    ) -> Result<EntityResponse<E>, QueryError>
784    where
785        E: EntityValue,
786    {
787        self.ensure_non_paged_mode_ready()?;
788
789        Self::with_slot(field, |target_slot| {
790            self.session
791                .execute_load_query_with(self.query(), move |load, plan| {
792                    load.bottom_k_by_slot(plan, target_slot, take_count)
793                })
794        })
795    }
796
797    /// Execute and return projected values for the top `k` rows by `field`
798    /// under deterministic ordering `(field desc, primary_key asc)` over the
799    /// effective response window.
800    ///
801    /// Ranking is applied before projection and does not preserve query
802    /// `order_by(...)` row order in the returned values. For `k = 1`, this
803    /// matches `max_by(field)` projected to one value.
804    pub fn top_k_by_values(
805        &self,
806        field: impl AsRef<str>,
807        take_count: u32,
808    ) -> Result<Vec<Value>, QueryError>
809    where
810        E: EntityValue,
811    {
812        self.ensure_non_paged_mode_ready()?;
813
814        Self::with_slot(field, |target_slot| {
815            self.session
816                .execute_load_query_with(self.query(), move |load, plan| {
817                    load.top_k_by_values_slot(plan, target_slot, take_count)
818                })
819        })
820    }
821
822    /// Execute and return projected values for the bottom `k` rows by `field`
823    /// under deterministic ordering `(field asc, primary_key asc)` over the
824    /// effective response window.
825    ///
826    /// Ranking is applied before projection and does not preserve query
827    /// `order_by(...)` row order in the returned values. For `k = 1`, this
828    /// matches `min_by(field)` projected to one value.
829    pub fn bottom_k_by_values(
830        &self,
831        field: impl AsRef<str>,
832        take_count: u32,
833    ) -> Result<Vec<Value>, QueryError>
834    where
835        E: EntityValue,
836    {
837        self.ensure_non_paged_mode_ready()?;
838
839        Self::with_slot(field, |target_slot| {
840            self.session
841                .execute_load_query_with(self.query(), move |load, plan| {
842                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
843                })
844        })
845    }
846
847    /// Execute and return projected id/value pairs for the top `k` rows by
848    /// `field` under deterministic ordering `(field desc, primary_key asc)`
849    /// over the effective response window.
850    ///
851    /// Ranking is applied before projection and does not preserve query
852    /// `order_by(...)` row order in the returned values. For `k = 1`, this
853    /// matches `max_by(field)` projected to one `(id, value)` pair.
854    pub fn top_k_by_with_ids(
855        &self,
856        field: impl AsRef<str>,
857        take_count: u32,
858    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
859    where
860        E: EntityValue,
861    {
862        self.ensure_non_paged_mode_ready()?;
863
864        Self::with_slot(field, |target_slot| {
865            self.session
866                .execute_load_query_with(self.query(), move |load, plan| {
867                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
868                })
869        })
870    }
871
872    /// Execute and return projected id/value pairs for the bottom `k` rows by
873    /// `field` under deterministic ordering `(field asc, primary_key asc)`
874    /// over the effective response window.
875    ///
876    /// Ranking is applied before projection and does not preserve query
877    /// `order_by(...)` row order in the returned values. For `k = 1`, this
878    /// matches `min_by(field)` projected to one `(id, value)` pair.
879    pub fn bottom_k_by_with_ids(
880        &self,
881        field: impl AsRef<str>,
882        take_count: u32,
883    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
884    where
885        E: EntityValue,
886    {
887        self.ensure_non_paged_mode_ready()?;
888
889        Self::with_slot(field, |target_slot| {
890            self.session
891                .execute_load_query_with(self.query(), move |load, plan| {
892                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
893                })
894        })
895    }
896
897    /// Execute and return distinct projected field values for the effective
898    /// result window, preserving first-observed value order.
899    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
900    where
901        E: EntityValue,
902    {
903        self.ensure_non_paged_mode_ready()?;
904
905        Self::with_slot(field, |target_slot| {
906            self.execute_prepared_projection_terminal_output(
907                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
908            )?
909            .into_values()
910            .map_err(QueryError::execute)
911        })
912    }
913
914    /// Explain `distinct_values_by(field)` routing without executing the terminal.
915    pub fn explain_distinct_values_by(
916        &self,
917        field: impl AsRef<str>,
918    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
919    where
920        E: EntityValue,
921    {
922        self.ensure_non_paged_mode_ready()?;
923
924        Self::with_slot(field, |target_slot| {
925            self.explain_prepared_projection_non_paged_terminal(
926                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
927            )
928        })
929    }
930
931    /// Execute and return projected field values paired with row ids for the
932    /// effective result window.
933    pub fn values_by_with_ids(
934        &self,
935        field: impl AsRef<str>,
936    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
937    where
938        E: EntityValue,
939    {
940        self.ensure_non_paged_mode_ready()?;
941
942        Self::with_slot(field, |target_slot| {
943            self.execute_prepared_projection_terminal_output(
944                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
945            )?
946            .into_values_with_ids()
947            .map_err(QueryError::execute)
948        })
949    }
950
951    /// Execute and return projected id/value pairs for one shared bounded
952    /// projection over the effective response window.
953    pub fn project_values_with_ids<P>(
954        &self,
955        projection: &P,
956    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
957    where
958        E: EntityValue,
959        P: ValueProjectionExpr,
960    {
961        self.ensure_non_paged_mode_ready()?;
962
963        Self::with_slot(projection.field(), |target_slot| {
964            let values = self
965                .execute_prepared_projection_terminal_output(
966                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
967                )?
968                .into_values_with_ids::<E>()
969                .map_err(QueryError::execute)?;
970
971            Self::project_terminal_items(projection, values, |projection, (id, value)| {
972                Ok((id, projection.apply_value(value)?))
973            })
974        })
975    }
976
977    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
978    pub fn explain_values_by_with_ids(
979        &self,
980        field: impl AsRef<str>,
981    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
982    where
983        E: EntityValue,
984    {
985        self.ensure_non_paged_mode_ready()?;
986
987        Self::with_slot(field, |target_slot| {
988            self.explain_prepared_projection_non_paged_terminal(
989                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
990            )
991        })
992    }
993
994    /// Execute and return the first projected field value in effective response
995    /// order, if any.
996    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
997    where
998        E: EntityValue,
999    {
1000        self.ensure_non_paged_mode_ready()?;
1001
1002        Self::with_slot(field, |target_slot| {
1003            self.execute_prepared_projection_terminal_output(
1004                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1005            )?
1006            .into_terminal_value()
1007            .map_err(QueryError::execute)
1008        })
1009    }
1010
1011    /// Execute and return the first projected value for one shared bounded
1012    /// projection in effective response order, if any.
1013    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1014    where
1015        E: EntityValue,
1016        P: ValueProjectionExpr,
1017    {
1018        self.ensure_non_paged_mode_ready()?;
1019
1020        Self::with_slot(projection.field(), |target_slot| {
1021            let value = self
1022                .execute_prepared_projection_terminal_output(
1023                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1024                )?
1025                .into_terminal_value()
1026                .map_err(QueryError::execute)?;
1027
1028            let mut projected =
1029                Self::project_terminal_items(projection, value, |projection, value| {
1030                    projection.apply_value(value)
1031                })?;
1032
1033            Ok(projected.pop())
1034        })
1035    }
1036
1037    /// Explain `first_value_by(field)` routing without executing the terminal.
1038    pub fn explain_first_value_by(
1039        &self,
1040        field: impl AsRef<str>,
1041    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1042    where
1043        E: EntityValue,
1044    {
1045        self.ensure_non_paged_mode_ready()?;
1046
1047        Self::with_slot(field, |target_slot| {
1048            self.explain_prepared_projection_non_paged_terminal(
1049                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1050            )
1051        })
1052    }
1053
1054    /// Execute and return the last projected field value in effective response
1055    /// order, if any.
1056    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1057    where
1058        E: EntityValue,
1059    {
1060        self.ensure_non_paged_mode_ready()?;
1061
1062        Self::with_slot(field, |target_slot| {
1063            self.execute_prepared_projection_terminal_output(
1064                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1065            )?
1066            .into_terminal_value()
1067            .map_err(QueryError::execute)
1068        })
1069    }
1070
1071    /// Execute and return the last projected value for one shared bounded
1072    /// projection in effective response order, if any.
1073    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1074    where
1075        E: EntityValue,
1076        P: ValueProjectionExpr,
1077    {
1078        self.ensure_non_paged_mode_ready()?;
1079
1080        Self::with_slot(projection.field(), |target_slot| {
1081            let value = self
1082                .execute_prepared_projection_terminal_output(
1083                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1084                )?
1085                .into_terminal_value()
1086                .map_err(QueryError::execute)?;
1087
1088            let mut projected =
1089                Self::project_terminal_items(projection, value, |projection, value| {
1090                    projection.apply_value(value)
1091                })?;
1092
1093            Ok(projected.pop())
1094        })
1095    }
1096
1097    /// Explain `last_value_by(field)` routing without executing the terminal.
1098    pub fn explain_last_value_by(
1099        &self,
1100        field: impl AsRef<str>,
1101    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1102    where
1103        E: EntityValue,
1104    {
1105        self.ensure_non_paged_mode_ready()?;
1106
1107        Self::with_slot(field, |target_slot| {
1108            self.explain_prepared_projection_non_paged_terminal(
1109                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1110            )
1111        })
1112    }
1113
1114    /// Execute and return the first matching identifier in response order, if any.
1115    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1116    where
1117        E: EntityValue,
1118    {
1119        self.execute_prepared_order_sensitive_terminal_output(
1120            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1121        )?
1122        .into_id()
1123        .map_err(QueryError::execute)
1124    }
1125
1126    /// Explain scalar `first()` routing without executing the terminal.
1127    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1128    where
1129        E: EntityValue,
1130    {
1131        self.explain_prepared_aggregate_non_paged_terminal(
1132            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1133        )
1134    }
1135
1136    /// Execute and return the last matching identifier in response order, if any.
1137    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1138    where
1139        E: EntityValue,
1140    {
1141        self.execute_prepared_order_sensitive_terminal_output(
1142            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1143        )?
1144        .into_id()
1145        .map_err(QueryError::execute)
1146    }
1147
1148    /// Explain scalar `last()` routing without executing the terminal.
1149    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1150    where
1151        E: EntityValue,
1152    {
1153        self.explain_prepared_aggregate_non_paged_terminal(
1154            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1155        )
1156    }
1157
1158    /// Execute and require exactly one matching row.
1159    pub fn require_one(&self) -> Result<(), QueryError>
1160    where
1161        E: EntityValue,
1162    {
1163        self.execute()?.into_rows()?.require_one()?;
1164        Ok(())
1165    }
1166
1167    /// Execute and require at least one matching row.
1168    pub fn require_some(&self) -> Result<(), QueryError>
1169    where
1170        E: EntityValue,
1171    {
1172        self.execute()?.into_rows()?.require_some()?;
1173        Ok(())
1174    }
1175}
1176
1177fn scalar_terminal_boundary_request_from_prepared(
1178    request: PreparedFluentScalarTerminalRuntimeRequest,
1179) -> ScalarTerminalBoundaryRequest {
1180    match request {
1181        PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1182            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1183        }
1184        PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1185            ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
1186        }
1187    }
1188}
1189
1190const fn existing_rows_terminal_boundary_request_from_prepared(
1191    request: PreparedFluentExistingRowsTerminalRuntimeRequest,
1192) -> ScalarTerminalBoundaryRequest {
1193    match request {
1194        PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => {
1195            ScalarTerminalBoundaryRequest::Count
1196        }
1197        PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => {
1198            ScalarTerminalBoundaryRequest::Exists
1199        }
1200    }
1201}
1202
1203const fn numeric_field_boundary_request_from_prepared(
1204    request: PreparedFluentNumericFieldRuntimeRequest,
1205) -> ScalarNumericFieldBoundaryRequest {
1206    match request {
1207        PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1208        PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1209            ScalarNumericFieldBoundaryRequest::SumDistinct
1210        }
1211        PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1212        PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1213            ScalarNumericFieldBoundaryRequest::AvgDistinct
1214        }
1215    }
1216}
1217
1218fn order_sensitive_terminal_boundary_request_from_prepared(
1219    request: PreparedFluentOrderSensitiveTerminalRuntimeRequest,
1220) -> ScalarTerminalBoundaryRequest {
1221    match request {
1222        PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1223            ScalarTerminalBoundaryRequest::IdTerminal { kind }
1224        }
1225        PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1226            ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth }
1227        }
1228        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1229            ScalarTerminalBoundaryRequest::MedianBySlot { target_field }
1230        }
1231        PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1232            ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field }
1233        }
1234    }
1235}
1236
1237const fn projection_boundary_request_from_prepared(
1238    request: PreparedFluentProjectionRuntimeRequest,
1239) -> ScalarProjectionBoundaryRequest {
1240    match request {
1241        PreparedFluentProjectionRuntimeRequest::Values => ScalarProjectionBoundaryRequest::Values,
1242        PreparedFluentProjectionRuntimeRequest::DistinctValues => {
1243            ScalarProjectionBoundaryRequest::DistinctValues
1244        }
1245        PreparedFluentProjectionRuntimeRequest::CountDistinct => {
1246            ScalarProjectionBoundaryRequest::CountDistinct
1247        }
1248        PreparedFluentProjectionRuntimeRequest::ValuesWithIds => {
1249            ScalarProjectionBoundaryRequest::ValuesWithIds
1250        }
1251        PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1252            ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind }
1253        }
1254    }
1255}