Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                ValueProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::{OutputValue, Value},
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42// Convert one runtime projection value into the public output boundary type.
43fn output(value: Value) -> OutputValue {
44    OutputValue::from(value)
45}
46
47// Convert one ordered runtime projection vector into the public output form.
48fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
49    values.into_iter().map(output).collect()
50}
51
52// Convert one ordered runtime `(id, value)` projection vector into the public output form.
53fn output_values_with_ids<E: PersistedRow>(
54    values: Vec<(Id<E>, Value)>,
55) -> Vec<(Id<E>, OutputValue)> {
56    values
57        .into_iter()
58        .map(|(id, value)| (id, output(value)))
59        .collect()
60}
61
62impl<E> FluentLoadQuery<'_, E>
63where
64    E: PersistedRow,
65{
66    // ------------------------------------------------------------------
67    // Execution (single semantic boundary)
68    // ------------------------------------------------------------------
69
70    /// Execute this query using the session's policy settings.
71    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
72    where
73        E: EntityValue,
74    {
75        self.ensure_non_paged_mode_ready()?;
76        self.session.execute_query_result(self.query())
77    }
78
79    // Run one scalar terminal through the canonical non-paged fluent policy
80    // gate before handing execution to the session load-query adapter.
81    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
82    where
83        E: EntityValue,
84        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
85    {
86        self.ensure_non_paged_mode_ready()?;
87
88        self.session.execute_load_query_with(self.query(), execute)
89    }
90
91    // Run one read-only query/session projection through the canonical
92    // non-paged fluent policy gate so explain-style helpers do not each
93    // repeat the same readiness check and session handoff shell.
94    fn map_non_paged_query_output<T>(
95        &self,
96        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
97    ) -> Result<T, QueryError>
98    where
99        E: EntityValue,
100    {
101        self.ensure_non_paged_mode_ready()?;
102        map(self.session, self.query())
103    }
104
105    // Run one explain-visible aggregate terminal through the canonical
106    // non-paged fluent policy gate using the prepared aggregate strategy as
107    // the single explain projection source.
108    fn explain_prepared_aggregate_non_paged_terminal<S>(
109        &self,
110        strategy: &S,
111    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
112    where
113        E: EntityValue,
114        S: PreparedFluentAggregateExplainStrategy,
115    {
116        self.map_non_paged_query_output(|session, query| {
117            session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
118        })
119    }
120
121    // Run one prepared projection/distinct explain terminal through the
122    // canonical non-paged fluent policy gate using the prepared projection
123    // strategy as the single explain projection source.
124    fn explain_prepared_projection_non_paged_terminal(
125        &self,
126        strategy: &PreparedFluentProjectionStrategy,
127    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
128    where
129        E: EntityValue,
130    {
131        self.map_non_paged_query_output(|session, query| {
132            session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
133        })
134    }
135
136    // Resolve the structural execution descriptor for this fluent load query
137    // through the session-owned visible-index explain path once.
138    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
139    where
140        E: EntityValue,
141    {
142        self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
143    }
144
145    // Render one descriptor-derived execution surface so text/json explain
146    // terminals do not each forward the same session explain call ad hoc.
147    fn render_execution_descriptor(
148        &self,
149        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
150    ) -> Result<String, QueryError>
151    where
152        E: EntityValue,
153    {
154        let descriptor = self.explain_execution_descriptor()?;
155
156        Ok(render(descriptor))
157    }
158
159    // Execute one prepared existing-rows terminal and decode the typed output
160    // through one shared mismatch/error-mapping lane.
161    fn map_prepared_existing_rows_terminal_output<T>(
162        &self,
163        strategy: PreparedFluentExistingRowsTerminalStrategy,
164        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
165    ) -> Result<T, QueryError>
166    where
167        E: EntityValue,
168    {
169        let output =
170            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
171
172        map(output).map_err(QueryError::execute)
173    }
174
175    // Execute one prepared fluent numeric-field terminal through the canonical
176    // non-paged fluent policy gate using the prepared numeric strategy as the
177    // single runtime source.
178    fn execute_prepared_numeric_field_terminal(
179        &self,
180        strategy: PreparedFluentNumericFieldStrategy,
181    ) -> Result<Option<Decimal>, QueryError>
182    where
183        E: EntityValue,
184    {
185        let (target_field, runtime_request) = strategy.into_runtime_parts();
186
187        self.execute_scalar_non_paged_terminal(move |load, plan| {
188            load.execute_numeric_field_boundary(plan, target_field, runtime_request.into())
189        })
190    }
191
192    // Execute one prepared order-sensitive terminal and decode the typed
193    // output through one shared mismatch/error-mapping lane.
194    fn map_prepared_order_sensitive_terminal_output<T>(
195        &self,
196        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
197        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
198    ) -> Result<T, QueryError>
199    where
200        E: EntityValue,
201    {
202        let output =
203            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
204
205        map(output).map_err(QueryError::execute)
206    }
207
208    // Execute one prepared fluent projection/distinct terminal through the
209    // canonical non-paged fluent policy gate using the prepared projection
210    // strategy as the single runtime source.
211    fn execute_prepared_projection_terminal_output(
212        &self,
213        strategy: PreparedFluentProjectionStrategy,
214    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
215    where
216        E: EntityValue,
217    {
218        let (target_field, runtime_request) = strategy.into_runtime_parts();
219
220        self.execute_scalar_non_paged_terminal(move |load, plan| {
221            load.execute_scalar_projection_boundary(plan, target_field, runtime_request.into())
222        })
223    }
224
225    // Execute one prepared projection terminal and decode the typed output
226    // through one shared mismatch/error-mapping lane.
227    fn map_prepared_projection_terminal_output<T>(
228        &self,
229        strategy: PreparedFluentProjectionStrategy,
230        map: impl FnOnce(
231            crate::db::executor::ScalarProjectionBoundaryOutput,
232        ) -> Result<T, InternalError>,
233    ) -> Result<T, QueryError>
234    where
235        E: EntityValue,
236    {
237        let output = self.execute_prepared_projection_terminal_output(strategy)?;
238
239        map(output).map_err(QueryError::execute)
240    }
241
242    // Execute one already-lowered scalar terminal boundary request through
243    // the canonical non-paged fluent policy gate so terminal families that
244    // share the same scalar executor contract do not each rebuild it.
245    fn execute_scalar_terminal_boundary_output<R>(
246        &self,
247        runtime_request: R,
248    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
249    where
250        E: EntityValue,
251        R: Into<ScalarTerminalBoundaryRequest>,
252    {
253        self.execute_scalar_non_paged_terminal(move |load, plan| {
254            load.execute_scalar_terminal_request(plan, runtime_request.into())
255        })
256    }
257
258    // Execute one prepared scalar terminal and decode the typed output through
259    // one shared mismatch/error-mapping lane.
260    fn map_prepared_scalar_terminal_output<T>(
261        &self,
262        strategy: PreparedFluentScalarTerminalStrategy,
263        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
264    ) -> Result<T, QueryError>
265    where
266        E: EntityValue,
267    {
268        let output =
269            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
270
271        map(output).map_err(QueryError::execute)
272    }
273
274    // Apply one shared bounded value projection to iterator-like terminal
275    // output while preserving source order and cardinality.
276    fn project_terminal_items<P, T, U>(
277        projection: &P,
278        values: impl IntoIterator<Item = T>,
279        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
280    ) -> Result<Vec<U>, QueryError>
281    where
282        P: ValueProjectionExpr,
283    {
284        values
285            .into_iter()
286            .map(|value| map(projection, value))
287            .collect()
288    }
289
290    // ------------------------------------------------------------------
291    // Execution terminals — semantic only
292    // ------------------------------------------------------------------
293
294    /// Execute and return whether the result set is empty.
295    pub fn is_empty(&self) -> Result<bool, QueryError>
296    where
297        E: EntityValue,
298    {
299        self.not_exists()
300    }
301
302    /// Execute and return whether no matching row exists.
303    pub fn not_exists(&self) -> Result<bool, QueryError>
304    where
305        E: EntityValue,
306    {
307        Ok(!self.exists()?)
308    }
309
310    /// Execute and return whether at least one matching row exists.
311    pub fn exists(&self) -> Result<bool, QueryError>
312    where
313        E: EntityValue,
314    {
315        self.map_prepared_existing_rows_terminal_output(
316            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
317            ScalarTerminalBoundaryOutput::into_exists,
318        )
319    }
320
321    /// Explain scalar `exists()` routing without executing the terminal.
322    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
323    where
324        E: EntityValue,
325    {
326        self.explain_prepared_aggregate_non_paged_terminal(
327            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
328        )
329    }
330
331    /// Explain scalar `not_exists()` routing without executing the terminal.
332    ///
333    /// This remains an `exists()` execution plan with negated boolean semantics.
334    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
335    where
336        E: EntityValue,
337    {
338        self.explain_exists()
339    }
340
341    /// Explain scalar load execution shape without executing the query.
342    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
343    where
344        E: EntityValue,
345    {
346        self.explain_execution_descriptor()
347    }
348
349    /// Explain scalar load execution shape as deterministic text.
350    pub fn explain_execution_text(&self) -> Result<String, QueryError>
351    where
352        E: EntityValue,
353    {
354        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
355    }
356
357    /// Explain scalar load execution shape as canonical JSON.
358    pub fn explain_execution_json(&self) -> Result<String, QueryError>
359    where
360        E: EntityValue,
361    {
362        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
363    }
364
365    /// Explain scalar load execution shape as verbose text with diagnostics.
366    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
367    where
368        E: EntityValue,
369    {
370        self.map_non_paged_query_output(
371            DbSession::explain_query_execution_verbose_with_visible_indexes,
372        )
373    }
374
375    /// Execute and return the number of matching rows.
376    pub fn count(&self) -> Result<u32, QueryError>
377    where
378        E: EntityValue,
379    {
380        self.map_prepared_existing_rows_terminal_output(
381            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
382            ScalarTerminalBoundaryOutput::into_count,
383        )
384    }
385
386    /// Execute and return the total persisted payload bytes for the effective
387    /// result window.
388    pub fn bytes(&self) -> Result<u64, QueryError>
389    where
390        E: EntityValue,
391    {
392        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
393    }
394
395    /// Execute and return the total serialized bytes for `field` over the
396    /// effective result window.
397    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.with_non_paged_slot(field, |target_slot| {
402            self.session
403                .execute_load_query_with(self.query(), move |load, plan| {
404                    load.bytes_by_slot(plan, target_slot)
405                })
406        })
407    }
408
409    /// Explain `bytes_by(field)` routing without executing the terminal.
410    pub fn explain_bytes_by(
411        &self,
412        field: impl AsRef<str>,
413    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
414    where
415        E: EntityValue,
416    {
417        self.with_non_paged_slot(field, |target_slot| {
418            self.session
419                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
420        })
421    }
422
423    /// Execute and return the smallest matching identifier, if any.
424    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
425    where
426        E: EntityValue,
427    {
428        self.map_prepared_scalar_terminal_output(
429            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
430            ScalarTerminalBoundaryOutput::into_id,
431        )
432    }
433
434    /// Explain scalar `min()` routing without executing the terminal.
435    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
436    where
437        E: EntityValue,
438    {
439        self.explain_prepared_aggregate_non_paged_terminal(
440            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
441        )
442    }
443
444    /// Execute and return the id of the row with the smallest value for `field`.
445    ///
446    /// Ties are deterministic: equal field values resolve by primary key ascending.
447    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
448    where
449        E: EntityValue,
450    {
451        self.with_non_paged_slot(field, |target_slot| {
452            self.map_prepared_scalar_terminal_output(
453                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
454                ScalarTerminalBoundaryOutput::into_id,
455            )
456        })
457    }
458
459    /// Execute and return the largest matching identifier, if any.
460    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
461    where
462        E: EntityValue,
463    {
464        self.map_prepared_scalar_terminal_output(
465            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
466            ScalarTerminalBoundaryOutput::into_id,
467        )
468    }
469
470    /// Explain scalar `max()` routing without executing the terminal.
471    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
472    where
473        E: EntityValue,
474    {
475        self.explain_prepared_aggregate_non_paged_terminal(
476            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
477        )
478    }
479
480    /// Execute and return the id of the row with the largest value for `field`.
481    ///
482    /// Ties are deterministic: equal field values resolve by primary key ascending.
483    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
484    where
485        E: EntityValue,
486    {
487        self.with_non_paged_slot(field, |target_slot| {
488            self.map_prepared_scalar_terminal_output(
489                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
490                ScalarTerminalBoundaryOutput::into_id,
491            )
492        })
493    }
494
495    /// Execute and return the id at zero-based ordinal `nth` when rows are
496    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
497    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
498    where
499        E: EntityValue,
500    {
501        self.with_non_paged_slot(field, |target_slot| {
502            self.map_prepared_order_sensitive_terminal_output(
503                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
504                ScalarTerminalBoundaryOutput::into_id,
505            )
506        })
507    }
508
509    /// Execute and return the sum of `field` over matching rows.
510    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
511    where
512        E: EntityValue,
513    {
514        self.with_non_paged_slot(field, |target_slot| {
515            self.execute_prepared_numeric_field_terminal(
516                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
517            )
518        })
519    }
520
521    /// Explain scalar `sum_by(field)` routing without executing the terminal.
522    pub fn explain_sum_by(
523        &self,
524        field: impl AsRef<str>,
525    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526    where
527        E: EntityValue,
528    {
529        self.with_non_paged_slot(field, |target_slot| {
530            self.explain_prepared_aggregate_non_paged_terminal(
531                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
532            )
533        })
534    }
535
536    /// Execute and return the sum of distinct `field` values.
537    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
538    where
539        E: EntityValue,
540    {
541        self.with_non_paged_slot(field, |target_slot| {
542            self.execute_prepared_numeric_field_terminal(
543                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
544            )
545        })
546    }
547
548    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
549    pub fn explain_sum_distinct_by(
550        &self,
551        field: impl AsRef<str>,
552    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
553    where
554        E: EntityValue,
555    {
556        self.with_non_paged_slot(field, |target_slot| {
557            self.explain_prepared_aggregate_non_paged_terminal(
558                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
559            )
560        })
561    }
562
563    /// Execute and return the average of `field` over matching rows.
564    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
565    where
566        E: EntityValue,
567    {
568        self.with_non_paged_slot(field, |target_slot| {
569            self.execute_prepared_numeric_field_terminal(
570                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
571            )
572        })
573    }
574
575    /// Explain scalar `avg_by(field)` routing without executing the terminal.
576    pub fn explain_avg_by(
577        &self,
578        field: impl AsRef<str>,
579    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
580    where
581        E: EntityValue,
582    {
583        self.with_non_paged_slot(field, |target_slot| {
584            self.explain_prepared_aggregate_non_paged_terminal(
585                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
586            )
587        })
588    }
589
590    /// Execute and return the average of distinct `field` values.
591    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
592    where
593        E: EntityValue,
594    {
595        self.with_non_paged_slot(field, |target_slot| {
596            self.execute_prepared_numeric_field_terminal(
597                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
598            )
599        })
600    }
601
602    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
603    pub fn explain_avg_distinct_by(
604        &self,
605        field: impl AsRef<str>,
606    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
607    where
608        E: EntityValue,
609    {
610        self.with_non_paged_slot(field, |target_slot| {
611            self.explain_prepared_aggregate_non_paged_terminal(
612                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
613            )
614        })
615    }
616
617    /// Execute and return the median id by `field` using deterministic ordering
618    /// `(field asc, primary key asc)`.
619    ///
620    /// Even-length windows select the lower median.
621    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
622    where
623        E: EntityValue,
624    {
625        self.with_non_paged_slot(field, |target_slot| {
626            self.map_prepared_order_sensitive_terminal_output(
627                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
628                ScalarTerminalBoundaryOutput::into_id,
629            )
630        })
631    }
632
633    /// Execute and return the number of distinct values for `field` over the
634    /// effective result window.
635    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
636    where
637        E: EntityValue,
638    {
639        self.with_non_paged_slot(field, |target_slot| {
640            self.map_prepared_projection_terminal_output(
641                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
642                crate::db::executor::ScalarProjectionBoundaryOutput::into_count,
643            )
644        })
645    }
646
647    /// Explain `count_distinct_by(field)` routing without executing the terminal.
648    pub fn explain_count_distinct_by(
649        &self,
650        field: impl AsRef<str>,
651    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
652    where
653        E: EntityValue,
654    {
655        self.with_non_paged_slot(field, |target_slot| {
656            self.explain_prepared_projection_non_paged_terminal(
657                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
658            )
659        })
660    }
661
662    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
663    ///
664    /// Tie handling is deterministic for both extrema: primary key ascending.
665    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
666    where
667        E: EntityValue,
668    {
669        self.with_non_paged_slot(field, |target_slot| {
670            self.map_prepared_order_sensitive_terminal_output(
671                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
672                ScalarTerminalBoundaryOutput::into_id_pair,
673            )
674        })
675    }
676
677    /// Execute and return projected field values for the effective result window.
678    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
679    where
680        E: EntityValue,
681    {
682        self.with_non_paged_slot(field, |target_slot| {
683            self.map_prepared_projection_terminal_output(
684                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
685                crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
686            )
687            .map(output_values)
688        })
689    }
690
691    /// Execute and return projected values for one shared bounded projection
692    /// over the effective response window.
693    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
694    where
695        E: EntityValue,
696        P: ValueProjectionExpr,
697    {
698        self.with_non_paged_slot(projection.field(), |target_slot| {
699            let values = self
700                .execute_prepared_projection_terminal_output(
701                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
702                )?
703                .into_values()
704                .map_err(QueryError::execute)?;
705
706            Self::project_terminal_items(projection, values, |projection, value| {
707                projection.apply_value(value)
708            })
709            .map(output_values)
710        })
711    }
712
713    /// Explain `project_values(projection)` routing without executing it.
714    pub fn explain_project_values<P>(
715        &self,
716        projection: &P,
717    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
718    where
719        E: EntityValue,
720        P: ValueProjectionExpr,
721    {
722        self.with_non_paged_slot(projection.field(), |target_slot| {
723            self.explain_prepared_projection_non_paged_terminal(
724                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
725            )
726        })
727    }
728
729    /// Explain `values_by(field)` routing without executing the terminal.
730    pub fn explain_values_by(
731        &self,
732        field: impl AsRef<str>,
733    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
734    where
735        E: EntityValue,
736    {
737        self.with_non_paged_slot(field, |target_slot| {
738            self.explain_prepared_projection_non_paged_terminal(
739                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
740            )
741        })
742    }
743
744    /// Execute and return the first `k` rows from the effective response window.
745    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
746    where
747        E: EntityValue,
748    {
749        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
750    }
751
752    /// Execute and return the top `k` rows by `field` under deterministic
753    /// ordering `(field desc, primary_key asc)` over the effective response
754    /// window.
755    ///
756    /// This terminal applies its own ordering and does not preserve query
757    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
758    /// matches `max_by(field)` selection semantics.
759    pub fn top_k_by(
760        &self,
761        field: impl AsRef<str>,
762        take_count: u32,
763    ) -> Result<EntityResponse<E>, QueryError>
764    where
765        E: EntityValue,
766    {
767        self.with_non_paged_slot(field, |target_slot| {
768            self.session
769                .execute_load_query_with(self.query(), move |load, plan| {
770                    load.top_k_by_slot(plan, target_slot, take_count)
771                })
772        })
773    }
774
775    /// Execute and return the bottom `k` rows by `field` under deterministic
776    /// ordering `(field asc, primary_key asc)` over the effective response
777    /// window.
778    ///
779    /// This terminal applies its own ordering and does not preserve query
780    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
781    /// matches `min_by(field)` selection semantics.
782    pub fn bottom_k_by(
783        &self,
784        field: impl AsRef<str>,
785        take_count: u32,
786    ) -> Result<EntityResponse<E>, QueryError>
787    where
788        E: EntityValue,
789    {
790        self.with_non_paged_slot(field, |target_slot| {
791            self.session
792                .execute_load_query_with(self.query(), move |load, plan| {
793                    load.bottom_k_by_slot(plan, target_slot, take_count)
794                })
795        })
796    }
797
798    /// Execute and return projected values for the top `k` rows by `field`
799    /// under deterministic ordering `(field desc, primary_key asc)` over the
800    /// effective response window.
801    ///
802    /// Ranking is applied before projection and does not preserve query
803    /// `order_term(...)` row order in the returned values. For `k = 1`, this
804    /// matches `max_by(field)` projected to one value.
805    pub fn top_k_by_values(
806        &self,
807        field: impl AsRef<str>,
808        take_count: u32,
809    ) -> Result<Vec<OutputValue>, QueryError>
810    where
811        E: EntityValue,
812    {
813        self.with_non_paged_slot(field, |target_slot| {
814            self.session
815                .execute_load_query_with(self.query(), move |load, plan| {
816                    load.top_k_by_values_slot(plan, target_slot, take_count)
817                })
818                .map(output_values)
819        })
820    }
821
822    /// Execute and return projected values for the bottom `k` rows by `field`
823    /// under deterministic ordering `(field asc, primary_key asc)` over the
824    /// effective response window.
825    ///
826    /// Ranking is applied before projection and does not preserve query
827    /// `order_term(...)` row order in the returned values. For `k = 1`, this
828    /// matches `min_by(field)` projected to one value.
829    pub fn bottom_k_by_values(
830        &self,
831        field: impl AsRef<str>,
832        take_count: u32,
833    ) -> Result<Vec<OutputValue>, QueryError>
834    where
835        E: EntityValue,
836    {
837        self.with_non_paged_slot(field, |target_slot| {
838            self.session
839                .execute_load_query_with(self.query(), move |load, plan| {
840                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
841                })
842                .map(output_values)
843        })
844    }
845
846    /// Execute and return projected id/value pairs for the top `k` rows by
847    /// `field` under deterministic ordering `(field desc, primary_key asc)`
848    /// over the effective response window.
849    ///
850    /// Ranking is applied before projection and does not preserve query
851    /// `order_term(...)` row order in the returned values. For `k = 1`, this
852    /// matches `max_by(field)` projected to one `(id, value)` pair.
853    pub fn top_k_by_with_ids(
854        &self,
855        field: impl AsRef<str>,
856        take_count: u32,
857    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
858    where
859        E: EntityValue,
860    {
861        self.with_non_paged_slot(field, |target_slot| {
862            self.session
863                .execute_load_query_with(self.query(), move |load, plan| {
864                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
865                })
866                .map(output_values_with_ids)
867        })
868    }
869
870    /// Execute and return projected id/value pairs for the bottom `k` rows by
871    /// `field` under deterministic ordering `(field asc, primary_key asc)`
872    /// over the effective response window.
873    ///
874    /// Ranking is applied before projection and does not preserve query
875    /// `order_term(...)` row order in the returned values. For `k = 1`, this
876    /// matches `min_by(field)` projected to one `(id, value)` pair.
877    pub fn bottom_k_by_with_ids(
878        &self,
879        field: impl AsRef<str>,
880        take_count: u32,
881    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
882    where
883        E: EntityValue,
884    {
885        self.with_non_paged_slot(field, |target_slot| {
886            self.session
887                .execute_load_query_with(self.query(), move |load, plan| {
888                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
889                })
890                .map(output_values_with_ids)
891        })
892    }
893
894    /// Execute and return distinct projected field values for the effective
895    /// result window, preserving first-observed value order.
896    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
897    where
898        E: EntityValue,
899    {
900        self.with_non_paged_slot(field, |target_slot| {
901            self.map_prepared_projection_terminal_output(
902                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
903                crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
904            )
905            .map(output_values)
906        })
907    }
908
909    /// Explain `distinct_values_by(field)` routing without executing the terminal.
910    pub fn explain_distinct_values_by(
911        &self,
912        field: impl AsRef<str>,
913    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
914    where
915        E: EntityValue,
916    {
917        self.with_non_paged_slot(field, |target_slot| {
918            self.explain_prepared_projection_non_paged_terminal(
919                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
920            )
921        })
922    }
923
924    /// Execute and return projected field values paired with row ids for the
925    /// effective result window.
926    pub fn values_by_with_ids(
927        &self,
928        field: impl AsRef<str>,
929    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
930    where
931        E: EntityValue,
932    {
933        self.with_non_paged_slot(field, |target_slot| {
934            self.map_prepared_projection_terminal_output(
935                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
936                crate::db::executor::ScalarProjectionBoundaryOutput::into_values_with_ids,
937            )
938            .map(output_values_with_ids)
939        })
940    }
941
942    /// Execute and return projected id/value pairs for one shared bounded
943    /// projection over the effective response window.
944    pub fn project_values_with_ids<P>(
945        &self,
946        projection: &P,
947    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
948    where
949        E: EntityValue,
950        P: ValueProjectionExpr,
951    {
952        self.with_non_paged_slot(projection.field(), |target_slot| {
953            let values = self
954                .execute_prepared_projection_terminal_output(
955                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
956                )?
957                .into_values_with_ids::<E>()
958                .map_err(QueryError::execute)?;
959
960            Self::project_terminal_items(projection, values, |projection, (id, value)| {
961                Ok((id, projection.apply_value(value)?))
962            })
963            .map(output_values_with_ids)
964        })
965    }
966
967    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
968    pub fn explain_values_by_with_ids(
969        &self,
970        field: impl AsRef<str>,
971    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
972    where
973        E: EntityValue,
974    {
975        self.with_non_paged_slot(field, |target_slot| {
976            self.explain_prepared_projection_non_paged_terminal(
977                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
978            )
979        })
980    }
981
982    /// Execute and return the first projected field value in effective response
983    /// order, if any.
984    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
985    where
986        E: EntityValue,
987    {
988        self.with_non_paged_slot(field, |target_slot| {
989            self.map_prepared_projection_terminal_output(
990                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
991                crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
992            )
993            .map(|value| value.map(output))
994        })
995    }
996
997    /// Execute and return the first projected value for one shared bounded
998    /// projection in effective response order, if any.
999    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1000    where
1001        E: EntityValue,
1002        P: ValueProjectionExpr,
1003    {
1004        self.with_non_paged_slot(projection.field(), |target_slot| {
1005            let value = self
1006                .execute_prepared_projection_terminal_output(
1007                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1008                )?
1009                .into_terminal_value()
1010                .map_err(QueryError::execute)?;
1011
1012            let mut projected =
1013                Self::project_terminal_items(projection, value, |projection, value| {
1014                    projection.apply_value(value)
1015                })?;
1016
1017            Ok(projected.pop().map(output))
1018        })
1019    }
1020
1021    /// Explain `first_value_by(field)` routing without executing the terminal.
1022    pub fn explain_first_value_by(
1023        &self,
1024        field: impl AsRef<str>,
1025    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1026    where
1027        E: EntityValue,
1028    {
1029        self.with_non_paged_slot(field, |target_slot| {
1030            self.explain_prepared_projection_non_paged_terminal(
1031                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1032            )
1033        })
1034    }
1035
1036    /// Execute and return the last projected field value in effective response
1037    /// order, if any.
1038    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1039    where
1040        E: EntityValue,
1041    {
1042        self.with_non_paged_slot(field, |target_slot| {
1043            self.map_prepared_projection_terminal_output(
1044                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1045                crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
1046            )
1047            .map(|value| value.map(output))
1048        })
1049    }
1050
1051    /// Execute and return the last projected value for one shared bounded
1052    /// projection in effective response order, if any.
1053    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1054    where
1055        E: EntityValue,
1056        P: ValueProjectionExpr,
1057    {
1058        self.with_non_paged_slot(projection.field(), |target_slot| {
1059            let value = self
1060                .execute_prepared_projection_terminal_output(
1061                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1062                )?
1063                .into_terminal_value()
1064                .map_err(QueryError::execute)?;
1065
1066            let mut projected =
1067                Self::project_terminal_items(projection, value, |projection, value| {
1068                    projection.apply_value(value)
1069                })?;
1070
1071            Ok(projected.pop().map(output))
1072        })
1073    }
1074
1075    /// Explain `last_value_by(field)` routing without executing the terminal.
1076    pub fn explain_last_value_by(
1077        &self,
1078        field: impl AsRef<str>,
1079    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1080    where
1081        E: EntityValue,
1082    {
1083        self.with_non_paged_slot(field, |target_slot| {
1084            self.explain_prepared_projection_non_paged_terminal(
1085                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1086            )
1087        })
1088    }
1089
1090    /// Execute and return the first matching identifier in response order, if any.
1091    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1092    where
1093        E: EntityValue,
1094    {
1095        self.map_prepared_order_sensitive_terminal_output(
1096            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1097            ScalarTerminalBoundaryOutput::into_id,
1098        )
1099    }
1100
1101    /// Explain scalar `first()` routing without executing the terminal.
1102    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1103    where
1104        E: EntityValue,
1105    {
1106        self.explain_prepared_aggregate_non_paged_terminal(
1107            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1108        )
1109    }
1110
1111    /// Execute and return the last matching identifier in response order, if any.
1112    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1113    where
1114        E: EntityValue,
1115    {
1116        self.map_prepared_order_sensitive_terminal_output(
1117            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1118            ScalarTerminalBoundaryOutput::into_id,
1119        )
1120    }
1121
1122    /// Explain scalar `last()` routing without executing the terminal.
1123    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1124    where
1125        E: EntityValue,
1126    {
1127        self.explain_prepared_aggregate_non_paged_terminal(
1128            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1129        )
1130    }
1131
1132    /// Execute and require exactly one matching row.
1133    pub fn require_one(&self) -> Result<(), QueryError>
1134    where
1135        E: EntityValue,
1136    {
1137        self.execute()?.into_rows()?.require_one()?;
1138        Ok(())
1139    }
1140
1141    /// Execute and require at least one matching row.
1142    pub fn require_some(&self) -> Result<(), QueryError>
1143    where
1144        E: EntityValue,
1145    {
1146        self.execute()?.into_rows()?.require_some()?;
1147        Ok(())
1148    }
1149}
1150
1151impl From<PreparedFluentScalarTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1152    fn from(value: PreparedFluentScalarTerminalRuntimeRequest) -> Self {
1153        match value {
1154            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1155                Self::IdTerminal { kind }
1156            }
1157            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1158                Self::IdBySlot { kind, target_field }
1159            }
1160        }
1161    }
1162}
1163
1164impl From<PreparedFluentExistingRowsTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1165    fn from(value: PreparedFluentExistingRowsTerminalRuntimeRequest) -> Self {
1166        match value {
1167            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => Self::Count,
1168            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => Self::Exists,
1169        }
1170    }
1171}
1172
1173impl From<PreparedFluentNumericFieldRuntimeRequest> for ScalarNumericFieldBoundaryRequest {
1174    fn from(value: PreparedFluentNumericFieldRuntimeRequest) -> Self {
1175        match value {
1176            PreparedFluentNumericFieldRuntimeRequest::Sum => Self::Sum,
1177            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => Self::SumDistinct,
1178            PreparedFluentNumericFieldRuntimeRequest::Avg => Self::Avg,
1179            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => Self::AvgDistinct,
1180        }
1181    }
1182}
1183
1184impl From<PreparedFluentOrderSensitiveTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1185    fn from(value: PreparedFluentOrderSensitiveTerminalRuntimeRequest) -> Self {
1186        match value {
1187            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1188                Self::IdTerminal { kind }
1189            }
1190            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1191                Self::NthBySlot { target_field, nth }
1192            }
1193            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1194                Self::MedianBySlot { target_field }
1195            }
1196            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1197                Self::MinMaxBySlot { target_field }
1198            }
1199        }
1200    }
1201}
1202
1203impl From<PreparedFluentProjectionRuntimeRequest> for ScalarProjectionBoundaryRequest {
1204    fn from(value: PreparedFluentProjectionRuntimeRequest) -> Self {
1205        match value {
1206            PreparedFluentProjectionRuntimeRequest::Values => Self::Values,
1207            PreparedFluentProjectionRuntimeRequest::DistinctValues => Self::DistinctValues,
1208            PreparedFluentProjectionRuntimeRequest::CountDistinct => Self::CountDistinct,
1209            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => Self::ValuesWithIds,
1210            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1211                Self::TerminalValue { terminal_kind }
1212            }
1213        }
1214    }
1215}