Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                ValueProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56        self.session.execute_query_result(self.query())
57    }
58
59    // Run one scalar terminal through the canonical non-paged fluent policy
60    // gate before handing execution to the session load-query adapter.
61    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
62    where
63        E: EntityValue,
64        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
65    {
66        self.ensure_non_paged_mode_ready()?;
67
68        self.session.execute_load_query_with(self.query(), execute)
69    }
70
71    // Run one read-only query/session projection through the canonical
72    // non-paged fluent policy gate so explain-style helpers do not each
73    // repeat the same readiness check and session handoff shell.
74    fn map_non_paged_query_output<T>(
75        &self,
76        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
77    ) -> Result<T, QueryError>
78    where
79        E: EntityValue,
80    {
81        self.ensure_non_paged_mode_ready()?;
82        map(self.session, self.query())
83    }
84
85    // Run one explain-visible aggregate terminal through the canonical
86    // non-paged fluent policy gate using the prepared aggregate strategy as
87    // the single explain projection source.
88    fn explain_prepared_aggregate_non_paged_terminal<S>(
89        &self,
90        strategy: &S,
91    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
92    where
93        E: EntityValue,
94        S: PreparedFluentAggregateExplainStrategy,
95    {
96        self.map_non_paged_query_output(|session, query| {
97            session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
98        })
99    }
100
101    // Run one prepared projection/distinct explain terminal through the
102    // canonical non-paged fluent policy gate using the prepared projection
103    // strategy as the single explain projection source.
104    fn explain_prepared_projection_non_paged_terminal(
105        &self,
106        strategy: &PreparedFluentProjectionStrategy,
107    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
108    where
109        E: EntityValue,
110    {
111        self.map_non_paged_query_output(|session, query| {
112            session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
113        })
114    }
115
116    // Resolve the structural execution descriptor for this fluent load query
117    // through the session-owned visible-index explain path once.
118    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
119    where
120        E: EntityValue,
121    {
122        self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
123    }
124
125    // Render one descriptor-derived execution surface so text/json explain
126    // terminals do not each forward the same session explain call ad hoc.
127    fn render_execution_descriptor(
128        &self,
129        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
130    ) -> Result<String, QueryError>
131    where
132        E: EntityValue,
133    {
134        let descriptor = self.explain_execution_descriptor()?;
135
136        Ok(render(descriptor))
137    }
138
139    // Execute one prepared existing-rows terminal and decode the typed output
140    // through one shared mismatch/error-mapping lane.
141    fn map_prepared_existing_rows_terminal_output<T>(
142        &self,
143        strategy: PreparedFluentExistingRowsTerminalStrategy,
144        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
145    ) -> Result<T, QueryError>
146    where
147        E: EntityValue,
148    {
149        let output =
150            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
151
152        map(output).map_err(QueryError::execute)
153    }
154
155    // Execute one prepared fluent numeric-field terminal through the canonical
156    // non-paged fluent policy gate using the prepared numeric strategy as the
157    // single runtime source.
158    fn execute_prepared_numeric_field_terminal(
159        &self,
160        strategy: PreparedFluentNumericFieldStrategy,
161    ) -> Result<Option<Decimal>, QueryError>
162    where
163        E: EntityValue,
164    {
165        let (target_field, runtime_request) = strategy.into_runtime_parts();
166
167        self.execute_scalar_non_paged_terminal(move |load, plan| {
168            load.execute_numeric_field_boundary(plan, target_field, runtime_request.into())
169        })
170    }
171
172    // Execute one prepared order-sensitive terminal and decode the typed
173    // output through one shared mismatch/error-mapping lane.
174    fn map_prepared_order_sensitive_terminal_output<T>(
175        &self,
176        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
177        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
178    ) -> Result<T, QueryError>
179    where
180        E: EntityValue,
181    {
182        let output =
183            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
184
185        map(output).map_err(QueryError::execute)
186    }
187
188    // Execute one prepared fluent projection/distinct terminal through the
189    // canonical non-paged fluent policy gate using the prepared projection
190    // strategy as the single runtime source.
191    fn execute_prepared_projection_terminal_output(
192        &self,
193        strategy: PreparedFluentProjectionStrategy,
194    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
195    where
196        E: EntityValue,
197    {
198        let (target_field, runtime_request) = strategy.into_runtime_parts();
199
200        self.execute_scalar_non_paged_terminal(move |load, plan| {
201            load.execute_scalar_projection_boundary(plan, target_field, runtime_request.into())
202        })
203    }
204
205    // Execute one prepared projection terminal and decode the typed output
206    // through one shared mismatch/error-mapping lane.
207    fn map_prepared_projection_terminal_output<T>(
208        &self,
209        strategy: PreparedFluentProjectionStrategy,
210        map: impl FnOnce(
211            crate::db::executor::ScalarProjectionBoundaryOutput,
212        ) -> Result<T, InternalError>,
213    ) -> Result<T, QueryError>
214    where
215        E: EntityValue,
216    {
217        let output = self.execute_prepared_projection_terminal_output(strategy)?;
218
219        map(output).map_err(QueryError::execute)
220    }
221
222    // Execute one already-lowered scalar terminal boundary request through
223    // the canonical non-paged fluent policy gate so terminal families that
224    // share the same scalar executor contract do not each rebuild it.
225    fn execute_scalar_terminal_boundary_output<R>(
226        &self,
227        runtime_request: R,
228    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
229    where
230        E: EntityValue,
231        R: Into<ScalarTerminalBoundaryRequest>,
232    {
233        self.execute_scalar_non_paged_terminal(move |load, plan| {
234            load.execute_scalar_terminal_request(plan, runtime_request.into())
235        })
236    }
237
238    // Execute one prepared scalar terminal and decode the typed output through
239    // one shared mismatch/error-mapping lane.
240    fn map_prepared_scalar_terminal_output<T>(
241        &self,
242        strategy: PreparedFluentScalarTerminalStrategy,
243        map: impl FnOnce(ScalarTerminalBoundaryOutput) -> Result<T, InternalError>,
244    ) -> Result<T, QueryError>
245    where
246        E: EntityValue,
247    {
248        let output =
249            self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())?;
250
251        map(output).map_err(QueryError::execute)
252    }
253
254    // Apply one shared bounded value projection to iterator-like terminal
255    // output while preserving source order and cardinality.
256    fn project_terminal_items<P, T, U>(
257        projection: &P,
258        values: impl IntoIterator<Item = T>,
259        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
260    ) -> Result<Vec<U>, QueryError>
261    where
262        P: ValueProjectionExpr,
263    {
264        values
265            .into_iter()
266            .map(|value| map(projection, value))
267            .collect()
268    }
269
270    // ------------------------------------------------------------------
271    // Execution terminals — semantic only
272    // ------------------------------------------------------------------
273
274    /// Execute and return whether the result set is empty.
275    pub fn is_empty(&self) -> Result<bool, QueryError>
276    where
277        E: EntityValue,
278    {
279        self.not_exists()
280    }
281
282    /// Execute and return whether no matching row exists.
283    pub fn not_exists(&self) -> Result<bool, QueryError>
284    where
285        E: EntityValue,
286    {
287        Ok(!self.exists()?)
288    }
289
290    /// Execute and return whether at least one matching row exists.
291    pub fn exists(&self) -> Result<bool, QueryError>
292    where
293        E: EntityValue,
294    {
295        self.map_prepared_existing_rows_terminal_output(
296            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
297            ScalarTerminalBoundaryOutput::into_exists,
298        )
299    }
300
301    /// Explain scalar `exists()` routing without executing the terminal.
302    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
303    where
304        E: EntityValue,
305    {
306        self.explain_prepared_aggregate_non_paged_terminal(
307            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
308        )
309    }
310
311    /// Explain scalar `not_exists()` routing without executing the terminal.
312    ///
313    /// This remains an `exists()` execution plan with negated boolean semantics.
314    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
315    where
316        E: EntityValue,
317    {
318        self.explain_exists()
319    }
320
321    /// Explain scalar load execution shape without executing the query.
322    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
323    where
324        E: EntityValue,
325    {
326        self.explain_execution_descriptor()
327    }
328
329    /// Explain scalar load execution shape as deterministic text.
330    pub fn explain_execution_text(&self) -> Result<String, QueryError>
331    where
332        E: EntityValue,
333    {
334        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
335    }
336
337    /// Explain scalar load execution shape as canonical JSON.
338    pub fn explain_execution_json(&self) -> Result<String, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
343    }
344
345    /// Explain scalar load execution shape as verbose text with diagnostics.
346    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
347    where
348        E: EntityValue,
349    {
350        self.map_non_paged_query_output(
351            DbSession::explain_query_execution_verbose_with_visible_indexes,
352        )
353    }
354
355    /// Execute and return the number of matching rows.
356    pub fn count(&self) -> Result<u32, QueryError>
357    where
358        E: EntityValue,
359    {
360        self.map_prepared_existing_rows_terminal_output(
361            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
362            ScalarTerminalBoundaryOutput::into_count,
363        )
364    }
365
366    /// Execute and return the total persisted payload bytes for the effective
367    /// result window.
368    pub fn bytes(&self) -> Result<u64, QueryError>
369    where
370        E: EntityValue,
371    {
372        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
373    }
374
375    /// Execute and return the total serialized bytes for `field` over the
376    /// effective result window.
377    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
378    where
379        E: EntityValue,
380    {
381        self.with_non_paged_slot(field, |target_slot| {
382            self.session
383                .execute_load_query_with(self.query(), move |load, plan| {
384                    load.bytes_by_slot(plan, target_slot)
385                })
386        })
387    }
388
389    /// Explain `bytes_by(field)` routing without executing the terminal.
390    pub fn explain_bytes_by(
391        &self,
392        field: impl AsRef<str>,
393    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
394    where
395        E: EntityValue,
396    {
397        self.with_non_paged_slot(field, |target_slot| {
398            self.session
399                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
400        })
401    }
402
403    /// Execute and return the smallest matching identifier, if any.
404    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
405    where
406        E: EntityValue,
407    {
408        self.map_prepared_scalar_terminal_output(
409            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
410            ScalarTerminalBoundaryOutput::into_id,
411        )
412    }
413
414    /// Explain scalar `min()` routing without executing the terminal.
415    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
416    where
417        E: EntityValue,
418    {
419        self.explain_prepared_aggregate_non_paged_terminal(
420            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
421        )
422    }
423
424    /// Execute and return the id of the row with the smallest value for `field`.
425    ///
426    /// Ties are deterministic: equal field values resolve by primary key ascending.
427    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
428    where
429        E: EntityValue,
430    {
431        self.with_non_paged_slot(field, |target_slot| {
432            self.map_prepared_scalar_terminal_output(
433                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
434                ScalarTerminalBoundaryOutput::into_id,
435            )
436        })
437    }
438
439    /// Execute and return the largest matching identifier, if any.
440    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
441    where
442        E: EntityValue,
443    {
444        self.map_prepared_scalar_terminal_output(
445            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
446            ScalarTerminalBoundaryOutput::into_id,
447        )
448    }
449
450    /// Explain scalar `max()` routing without executing the terminal.
451    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
452    where
453        E: EntityValue,
454    {
455        self.explain_prepared_aggregate_non_paged_terminal(
456            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
457        )
458    }
459
460    /// Execute and return the id of the row with the largest value for `field`.
461    ///
462    /// Ties are deterministic: equal field values resolve by primary key ascending.
463    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
464    where
465        E: EntityValue,
466    {
467        self.with_non_paged_slot(field, |target_slot| {
468            self.map_prepared_scalar_terminal_output(
469                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
470                ScalarTerminalBoundaryOutput::into_id,
471            )
472        })
473    }
474
475    /// Execute and return the id at zero-based ordinal `nth` when rows are
476    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
477    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
478    where
479        E: EntityValue,
480    {
481        self.with_non_paged_slot(field, |target_slot| {
482            self.map_prepared_order_sensitive_terminal_output(
483                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
484                ScalarTerminalBoundaryOutput::into_id,
485            )
486        })
487    }
488
489    /// Execute and return the sum of `field` over matching rows.
490    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
491    where
492        E: EntityValue,
493    {
494        self.with_non_paged_slot(field, |target_slot| {
495            self.execute_prepared_numeric_field_terminal(
496                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
497            )
498        })
499    }
500
501    /// Explain scalar `sum_by(field)` routing without executing the terminal.
502    pub fn explain_sum_by(
503        &self,
504        field: impl AsRef<str>,
505    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
506    where
507        E: EntityValue,
508    {
509        self.with_non_paged_slot(field, |target_slot| {
510            self.explain_prepared_aggregate_non_paged_terminal(
511                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
512            )
513        })
514    }
515
516    /// Execute and return the sum of distinct `field` values.
517    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
518    where
519        E: EntityValue,
520    {
521        self.with_non_paged_slot(field, |target_slot| {
522            self.execute_prepared_numeric_field_terminal(
523                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
524            )
525        })
526    }
527
528    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
529    pub fn explain_sum_distinct_by(
530        &self,
531        field: impl AsRef<str>,
532    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
533    where
534        E: EntityValue,
535    {
536        self.with_non_paged_slot(field, |target_slot| {
537            self.explain_prepared_aggregate_non_paged_terminal(
538                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
539            )
540        })
541    }
542
543    /// Execute and return the average of `field` over matching rows.
544    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
545    where
546        E: EntityValue,
547    {
548        self.with_non_paged_slot(field, |target_slot| {
549            self.execute_prepared_numeric_field_terminal(
550                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
551            )
552        })
553    }
554
555    /// Explain scalar `avg_by(field)` routing without executing the terminal.
556    pub fn explain_avg_by(
557        &self,
558        field: impl AsRef<str>,
559    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
560    where
561        E: EntityValue,
562    {
563        self.with_non_paged_slot(field, |target_slot| {
564            self.explain_prepared_aggregate_non_paged_terminal(
565                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
566            )
567        })
568    }
569
570    /// Execute and return the average of distinct `field` values.
571    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
572    where
573        E: EntityValue,
574    {
575        self.with_non_paged_slot(field, |target_slot| {
576            self.execute_prepared_numeric_field_terminal(
577                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
578            )
579        })
580    }
581
582    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
583    pub fn explain_avg_distinct_by(
584        &self,
585        field: impl AsRef<str>,
586    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
587    where
588        E: EntityValue,
589    {
590        self.with_non_paged_slot(field, |target_slot| {
591            self.explain_prepared_aggregate_non_paged_terminal(
592                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
593            )
594        })
595    }
596
597    /// Execute and return the median id by `field` using deterministic ordering
598    /// `(field asc, primary key asc)`.
599    ///
600    /// Even-length windows select the lower median.
601    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
602    where
603        E: EntityValue,
604    {
605        self.with_non_paged_slot(field, |target_slot| {
606            self.map_prepared_order_sensitive_terminal_output(
607                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
608                ScalarTerminalBoundaryOutput::into_id,
609            )
610        })
611    }
612
613    /// Execute and return the number of distinct values for `field` over the
614    /// effective result window.
615    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
616    where
617        E: EntityValue,
618    {
619        self.with_non_paged_slot(field, |target_slot| {
620            self.map_prepared_projection_terminal_output(
621                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
622                crate::db::executor::ScalarProjectionBoundaryOutput::into_count,
623            )
624        })
625    }
626
627    /// Explain `count_distinct_by(field)` routing without executing the terminal.
628    pub fn explain_count_distinct_by(
629        &self,
630        field: impl AsRef<str>,
631    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
632    where
633        E: EntityValue,
634    {
635        self.with_non_paged_slot(field, |target_slot| {
636            self.explain_prepared_projection_non_paged_terminal(
637                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
638            )
639        })
640    }
641
642    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
643    ///
644    /// Tie handling is deterministic for both extrema: primary key ascending.
645    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
646    where
647        E: EntityValue,
648    {
649        self.with_non_paged_slot(field, |target_slot| {
650            self.map_prepared_order_sensitive_terminal_output(
651                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
652                ScalarTerminalBoundaryOutput::into_id_pair,
653            )
654        })
655    }
656
657    /// Execute and return projected field values for the effective result window.
658    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
659    where
660        E: EntityValue,
661    {
662        self.with_non_paged_slot(field, |target_slot| {
663            self.map_prepared_projection_terminal_output(
664                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
665                crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
666            )
667        })
668    }
669
670    /// Execute and return projected values for one shared bounded projection
671    /// over the effective response window.
672    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<Value>, QueryError>
673    where
674        E: EntityValue,
675        P: ValueProjectionExpr,
676    {
677        self.with_non_paged_slot(projection.field(), |target_slot| {
678            let values = self
679                .execute_prepared_projection_terminal_output(
680                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
681                )?
682                .into_values()
683                .map_err(QueryError::execute)?;
684
685            Self::project_terminal_items(projection, values, |projection, value| {
686                projection.apply_value(value)
687            })
688        })
689    }
690
691    /// Explain `project_values(projection)` routing without executing it.
692    pub fn explain_project_values<P>(
693        &self,
694        projection: &P,
695    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
696    where
697        E: EntityValue,
698        P: ValueProjectionExpr,
699    {
700        self.with_non_paged_slot(projection.field(), |target_slot| {
701            self.explain_prepared_projection_non_paged_terminal(
702                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
703            )
704        })
705    }
706
707    /// Explain `values_by(field)` routing without executing the terminal.
708    pub fn explain_values_by(
709        &self,
710        field: impl AsRef<str>,
711    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
712    where
713        E: EntityValue,
714    {
715        self.with_non_paged_slot(field, |target_slot| {
716            self.explain_prepared_projection_non_paged_terminal(
717                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
718            )
719        })
720    }
721
722    /// Execute and return the first `k` rows from the effective response window.
723    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
724    where
725        E: EntityValue,
726    {
727        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
728    }
729
730    /// Execute and return the top `k` rows by `field` under deterministic
731    /// ordering `(field desc, primary_key asc)` over the effective response
732    /// window.
733    ///
734    /// This terminal applies its own ordering and does not preserve query
735    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
736    /// matches `max_by(field)` selection semantics.
737    pub fn top_k_by(
738        &self,
739        field: impl AsRef<str>,
740        take_count: u32,
741    ) -> Result<EntityResponse<E>, QueryError>
742    where
743        E: EntityValue,
744    {
745        self.with_non_paged_slot(field, |target_slot| {
746            self.session
747                .execute_load_query_with(self.query(), move |load, plan| {
748                    load.top_k_by_slot(plan, target_slot, take_count)
749                })
750        })
751    }
752
753    /// Execute and return the bottom `k` rows by `field` under deterministic
754    /// ordering `(field asc, primary_key asc)` over the effective response
755    /// window.
756    ///
757    /// This terminal applies its own ordering and does not preserve query
758    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
759    /// matches `min_by(field)` selection semantics.
760    pub fn bottom_k_by(
761        &self,
762        field: impl AsRef<str>,
763        take_count: u32,
764    ) -> Result<EntityResponse<E>, QueryError>
765    where
766        E: EntityValue,
767    {
768        self.with_non_paged_slot(field, |target_slot| {
769            self.session
770                .execute_load_query_with(self.query(), move |load, plan| {
771                    load.bottom_k_by_slot(plan, target_slot, take_count)
772                })
773        })
774    }
775
776    /// Execute and return projected values for the top `k` rows by `field`
777    /// under deterministic ordering `(field desc, primary_key asc)` over the
778    /// effective response window.
779    ///
780    /// Ranking is applied before projection and does not preserve query
781    /// `order_term(...)` row order in the returned values. For `k = 1`, this
782    /// matches `max_by(field)` projected to one value.
783    pub fn top_k_by_values(
784        &self,
785        field: impl AsRef<str>,
786        take_count: u32,
787    ) -> Result<Vec<Value>, QueryError>
788    where
789        E: EntityValue,
790    {
791        self.with_non_paged_slot(field, |target_slot| {
792            self.session
793                .execute_load_query_with(self.query(), move |load, plan| {
794                    load.top_k_by_values_slot(plan, target_slot, take_count)
795                })
796        })
797    }
798
799    /// Execute and return projected values for the bottom `k` rows by `field`
800    /// under deterministic ordering `(field asc, primary_key asc)` over the
801    /// effective response window.
802    ///
803    /// Ranking is applied before projection and does not preserve query
804    /// `order_term(...)` row order in the returned values. For `k = 1`, this
805    /// matches `min_by(field)` projected to one value.
806    pub fn bottom_k_by_values(
807        &self,
808        field: impl AsRef<str>,
809        take_count: u32,
810    ) -> Result<Vec<Value>, QueryError>
811    where
812        E: EntityValue,
813    {
814        self.with_non_paged_slot(field, |target_slot| {
815            self.session
816                .execute_load_query_with(self.query(), move |load, plan| {
817                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
818                })
819        })
820    }
821
822    /// Execute and return projected id/value pairs for the top `k` rows by
823    /// `field` under deterministic ordering `(field desc, primary_key asc)`
824    /// over the effective response window.
825    ///
826    /// Ranking is applied before projection and does not preserve query
827    /// `order_term(...)` row order in the returned values. For `k = 1`, this
828    /// matches `max_by(field)` projected to one `(id, value)` pair.
829    pub fn top_k_by_with_ids(
830        &self,
831        field: impl AsRef<str>,
832        take_count: u32,
833    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
834    where
835        E: EntityValue,
836    {
837        self.with_non_paged_slot(field, |target_slot| {
838            self.session
839                .execute_load_query_with(self.query(), move |load, plan| {
840                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
841                })
842        })
843    }
844
845    /// Execute and return projected id/value pairs for the bottom `k` rows by
846    /// `field` under deterministic ordering `(field asc, primary_key asc)`
847    /// over the effective response window.
848    ///
849    /// Ranking is applied before projection and does not preserve query
850    /// `order_term(...)` row order in the returned values. For `k = 1`, this
851    /// matches `min_by(field)` projected to one `(id, value)` pair.
852    pub fn bottom_k_by_with_ids(
853        &self,
854        field: impl AsRef<str>,
855        take_count: u32,
856    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
857    where
858        E: EntityValue,
859    {
860        self.with_non_paged_slot(field, |target_slot| {
861            self.session
862                .execute_load_query_with(self.query(), move |load, plan| {
863                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
864                })
865        })
866    }
867
868    /// Execute and return distinct projected field values for the effective
869    /// result window, preserving first-observed value order.
870    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
871    where
872        E: EntityValue,
873    {
874        self.with_non_paged_slot(field, |target_slot| {
875            self.map_prepared_projection_terminal_output(
876                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
877                crate::db::executor::ScalarProjectionBoundaryOutput::into_values,
878            )
879        })
880    }
881
882    /// Explain `distinct_values_by(field)` routing without executing the terminal.
883    pub fn explain_distinct_values_by(
884        &self,
885        field: impl AsRef<str>,
886    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
887    where
888        E: EntityValue,
889    {
890        self.with_non_paged_slot(field, |target_slot| {
891            self.explain_prepared_projection_non_paged_terminal(
892                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
893            )
894        })
895    }
896
897    /// Execute and return projected field values paired with row ids for the
898    /// effective result window.
899    pub fn values_by_with_ids(
900        &self,
901        field: impl AsRef<str>,
902    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
903    where
904        E: EntityValue,
905    {
906        self.with_non_paged_slot(field, |target_slot| {
907            self.map_prepared_projection_terminal_output(
908                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
909                crate::db::executor::ScalarProjectionBoundaryOutput::into_values_with_ids,
910            )
911        })
912    }
913
914    /// Execute and return projected id/value pairs for one shared bounded
915    /// projection over the effective response window.
916    pub fn project_values_with_ids<P>(
917        &self,
918        projection: &P,
919    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
920    where
921        E: EntityValue,
922        P: ValueProjectionExpr,
923    {
924        self.with_non_paged_slot(projection.field(), |target_slot| {
925            let values = self
926                .execute_prepared_projection_terminal_output(
927                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
928                )?
929                .into_values_with_ids::<E>()
930                .map_err(QueryError::execute)?;
931
932            Self::project_terminal_items(projection, values, |projection, (id, value)| {
933                Ok((id, projection.apply_value(value)?))
934            })
935        })
936    }
937
938    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
939    pub fn explain_values_by_with_ids(
940        &self,
941        field: impl AsRef<str>,
942    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
943    where
944        E: EntityValue,
945    {
946        self.with_non_paged_slot(field, |target_slot| {
947            self.explain_prepared_projection_non_paged_terminal(
948                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
949            )
950        })
951    }
952
953    /// Execute and return the first projected field value in effective response
954    /// order, if any.
955    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
956    where
957        E: EntityValue,
958    {
959        self.with_non_paged_slot(field, |target_slot| {
960            self.map_prepared_projection_terminal_output(
961                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
962                crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
963            )
964        })
965    }
966
967    /// Execute and return the first projected value for one shared bounded
968    /// projection in effective response order, if any.
969    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
970    where
971        E: EntityValue,
972        P: ValueProjectionExpr,
973    {
974        self.with_non_paged_slot(projection.field(), |target_slot| {
975            let value = self
976                .execute_prepared_projection_terminal_output(
977                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
978                )?
979                .into_terminal_value()
980                .map_err(QueryError::execute)?;
981
982            let mut projected =
983                Self::project_terminal_items(projection, value, |projection, value| {
984                    projection.apply_value(value)
985                })?;
986
987            Ok(projected.pop())
988        })
989    }
990
991    /// Explain `first_value_by(field)` routing without executing the terminal.
992    pub fn explain_first_value_by(
993        &self,
994        field: impl AsRef<str>,
995    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
996    where
997        E: EntityValue,
998    {
999        self.with_non_paged_slot(field, |target_slot| {
1000            self.explain_prepared_projection_non_paged_terminal(
1001                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1002            )
1003        })
1004    }
1005
1006    /// Execute and return the last projected field value in effective response
1007    /// order, if any.
1008    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1009    where
1010        E: EntityValue,
1011    {
1012        self.with_non_paged_slot(field, |target_slot| {
1013            self.map_prepared_projection_terminal_output(
1014                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1015                crate::db::executor::ScalarProjectionBoundaryOutput::into_terminal_value,
1016            )
1017        })
1018    }
1019
1020    /// Execute and return the last projected value for one shared bounded
1021    /// projection in effective response order, if any.
1022    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1023    where
1024        E: EntityValue,
1025        P: ValueProjectionExpr,
1026    {
1027        self.with_non_paged_slot(projection.field(), |target_slot| {
1028            let value = self
1029                .execute_prepared_projection_terminal_output(
1030                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1031                )?
1032                .into_terminal_value()
1033                .map_err(QueryError::execute)?;
1034
1035            let mut projected =
1036                Self::project_terminal_items(projection, value, |projection, value| {
1037                    projection.apply_value(value)
1038                })?;
1039
1040            Ok(projected.pop())
1041        })
1042    }
1043
1044    /// Explain `last_value_by(field)` routing without executing the terminal.
1045    pub fn explain_last_value_by(
1046        &self,
1047        field: impl AsRef<str>,
1048    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1049    where
1050        E: EntityValue,
1051    {
1052        self.with_non_paged_slot(field, |target_slot| {
1053            self.explain_prepared_projection_non_paged_terminal(
1054                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1055            )
1056        })
1057    }
1058
1059    /// Execute and return the first matching identifier in response order, if any.
1060    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1061    where
1062        E: EntityValue,
1063    {
1064        self.map_prepared_order_sensitive_terminal_output(
1065            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1066            ScalarTerminalBoundaryOutput::into_id,
1067        )
1068    }
1069
1070    /// Explain scalar `first()` routing without executing the terminal.
1071    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1072    where
1073        E: EntityValue,
1074    {
1075        self.explain_prepared_aggregate_non_paged_terminal(
1076            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1077        )
1078    }
1079
1080    /// Execute and return the last matching identifier in response order, if any.
1081    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1082    where
1083        E: EntityValue,
1084    {
1085        self.map_prepared_order_sensitive_terminal_output(
1086            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1087            ScalarTerminalBoundaryOutput::into_id,
1088        )
1089    }
1090
1091    /// Explain scalar `last()` routing without executing the terminal.
1092    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1093    where
1094        E: EntityValue,
1095    {
1096        self.explain_prepared_aggregate_non_paged_terminal(
1097            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1098        )
1099    }
1100
1101    /// Execute and require exactly one matching row.
1102    pub fn require_one(&self) -> Result<(), QueryError>
1103    where
1104        E: EntityValue,
1105    {
1106        self.execute()?.into_rows()?.require_one()?;
1107        Ok(())
1108    }
1109
1110    /// Execute and require at least one matching row.
1111    pub fn require_some(&self) -> Result<(), QueryError>
1112    where
1113        E: EntityValue,
1114    {
1115        self.execute()?.into_rows()?.require_some()?;
1116        Ok(())
1117    }
1118}
1119
1120impl From<PreparedFluentScalarTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1121    fn from(value: PreparedFluentScalarTerminalRuntimeRequest) -> Self {
1122        match value {
1123            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1124                Self::IdTerminal { kind }
1125            }
1126            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1127                Self::IdBySlot { kind, target_field }
1128            }
1129        }
1130    }
1131}
1132
1133impl From<PreparedFluentExistingRowsTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1134    fn from(value: PreparedFluentExistingRowsTerminalRuntimeRequest) -> Self {
1135        match value {
1136            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => Self::Count,
1137            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => Self::Exists,
1138        }
1139    }
1140}
1141
1142impl From<PreparedFluentNumericFieldRuntimeRequest> for ScalarNumericFieldBoundaryRequest {
1143    fn from(value: PreparedFluentNumericFieldRuntimeRequest) -> Self {
1144        match value {
1145            PreparedFluentNumericFieldRuntimeRequest::Sum => Self::Sum,
1146            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => Self::SumDistinct,
1147            PreparedFluentNumericFieldRuntimeRequest::Avg => Self::Avg,
1148            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => Self::AvgDistinct,
1149        }
1150    }
1151}
1152
1153impl From<PreparedFluentOrderSensitiveTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1154    fn from(value: PreparedFluentOrderSensitiveTerminalRuntimeRequest) -> Self {
1155        match value {
1156            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1157                Self::IdTerminal { kind }
1158            }
1159            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1160                Self::NthBySlot { target_field, nth }
1161            }
1162            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1163                Self::MedianBySlot { target_field }
1164            }
1165            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1166                Self::MinMaxBySlot { target_field }
1167            }
1168        }
1169    }
1170}
1171
1172impl From<PreparedFluentProjectionRuntimeRequest> for ScalarProjectionBoundaryRequest {
1173    fn from(value: PreparedFluentProjectionRuntimeRequest) -> Self {
1174        match value {
1175            PreparedFluentProjectionRuntimeRequest::Values => Self::Values,
1176            PreparedFluentProjectionRuntimeRequest::DistinctValues => Self::DistinctValues,
1177            PreparedFluentProjectionRuntimeRequest::CountDistinct => Self::CountDistinct,
1178            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => Self::ValuesWithIds,
1179            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1180                Self::TerminalValue { terminal_kind }
1181            }
1182        }
1183    }
1184}