Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        PersistedRow,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                ValueProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        if self.query().has_grouping() {
58            return self
59                .session
60                .execute_grouped(self.query(), self.cursor_token.as_deref())
61                .map(LoadQueryResult::Grouped);
62        }
63
64        self.session
65            .execute_query(self.query())
66            .map(LoadQueryResult::Rows)
67    }
68
69    // Run one scalar terminal through the canonical non-paged fluent policy
70    // gate before handing execution to the session load-query adapter.
71    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
72    where
73        E: EntityValue,
74        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
75    {
76        self.ensure_non_paged_mode_ready()?;
77
78        self.session.execute_load_query_with(self.query(), execute)
79    }
80
81    // Run one explain-visible aggregate terminal through the canonical
82    // non-paged fluent policy gate using the prepared aggregate strategy as
83    // the single explain projection source.
84    fn explain_prepared_aggregate_non_paged_terminal<S>(
85        &self,
86        strategy: &S,
87    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
88    where
89        E: EntityValue,
90        S: PreparedFluentAggregateExplainStrategy,
91    {
92        self.ensure_non_paged_mode_ready()?;
93
94        self.session
95            .explain_query_prepared_aggregate_terminal_with_visible_indexes(self.query(), strategy)
96    }
97
98    // Run one prepared projection/distinct explain terminal through the
99    // canonical non-paged fluent policy gate using the prepared projection
100    // strategy as the single explain projection source.
101    fn explain_prepared_projection_non_paged_terminal(
102        &self,
103        strategy: &PreparedFluentProjectionStrategy,
104    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
105    where
106        E: EntityValue,
107    {
108        self.ensure_non_paged_mode_ready()?;
109
110        self.session
111            .explain_query_prepared_projection_terminal_with_visible_indexes(self.query(), strategy)
112    }
113
114    // Resolve the structural execution descriptor for this fluent load query
115    // through the session-owned visible-index explain path once.
116    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
117    where
118        E: EntityValue,
119    {
120        self.session
121            .explain_query_execution_with_visible_indexes(self.query())
122    }
123
124    // Render one descriptor-derived execution surface so text/json explain
125    // terminals do not each forward the same session explain call ad hoc.
126    fn render_execution_descriptor(
127        &self,
128        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
129    ) -> Result<String, QueryError>
130    where
131        E: EntityValue,
132    {
133        let descriptor = self.explain_execution_descriptor()?;
134
135        Ok(render(descriptor))
136    }
137
138    // Render one verbose execution explain payload through the same
139    // session-owned visible-index lane used by the other fluent explain forms.
140    fn explain_execution_verbose_text(&self) -> Result<String, QueryError>
141    where
142        E: EntityValue,
143    {
144        self.session
145            .explain_query_execution_verbose_with_visible_indexes(self.query())
146    }
147
148    // Execute one prepared fluent scalar terminal through the canonical
149    // non-paged fluent policy gate using the prepared runtime request as the
150    // single execution source.
151    fn execute_prepared_scalar_terminal_output(
152        &self,
153        strategy: PreparedFluentScalarTerminalStrategy,
154    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
155    where
156        E: EntityValue,
157    {
158        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
159    }
160
161    // Execute one prepared fluent existing-rows terminal through the
162    // canonical non-paged fluent policy gate using the prepared existing-rows
163    // strategy as the single runtime source.
164    fn execute_prepared_existing_rows_terminal_output(
165        &self,
166        strategy: PreparedFluentExistingRowsTerminalStrategy,
167    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
168    where
169        E: EntityValue,
170    {
171        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
172    }
173
174    // Execute one prepared fluent numeric-field terminal through the canonical
175    // non-paged fluent policy gate using the prepared numeric strategy as the
176    // single runtime source.
177    fn execute_prepared_numeric_field_terminal(
178        &self,
179        strategy: PreparedFluentNumericFieldStrategy,
180    ) -> Result<Option<Decimal>, QueryError>
181    where
182        E: EntityValue,
183    {
184        let (target_field, runtime_request) = strategy.into_runtime_parts();
185
186        self.execute_scalar_non_paged_terminal(move |load, plan| {
187            load.execute_numeric_field_boundary(plan, target_field, runtime_request.into())
188        })
189    }
190
191    // Execute one prepared fluent order-sensitive terminal through the
192    // canonical non-paged fluent policy gate using the prepared order-sensitive
193    // strategy as the single runtime source.
194    fn execute_prepared_order_sensitive_terminal_output(
195        &self,
196        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
197    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
198    where
199        E: EntityValue,
200    {
201        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
202    }
203
204    // Execute one prepared fluent projection/distinct terminal through the
205    // canonical non-paged fluent policy gate using the prepared projection
206    // strategy as the single runtime source.
207    fn execute_prepared_projection_terminal_output(
208        &self,
209        strategy: PreparedFluentProjectionStrategy,
210    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
211    where
212        E: EntityValue,
213    {
214        let (target_field, runtime_request) = strategy.into_runtime_parts();
215
216        self.execute_scalar_non_paged_terminal(move |load, plan| {
217            load.execute_scalar_projection_boundary(plan, target_field, runtime_request.into())
218        })
219    }
220
221    // Execute one already-lowered scalar terminal boundary request through
222    // the canonical non-paged fluent policy gate so terminal families that
223    // share the same scalar executor contract do not each rebuild it.
224    fn execute_scalar_terminal_boundary_output<R>(
225        &self,
226        runtime_request: R,
227    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
228    where
229        E: EntityValue,
230        R: Into<ScalarTerminalBoundaryRequest>,
231    {
232        self.execute_scalar_non_paged_terminal(move |load, plan| {
233            load.execute_scalar_terminal_request(plan, runtime_request.into())
234        })
235    }
236
237    // Apply one shared bounded value projection to iterator-like terminal
238    // output while preserving source order and cardinality.
239    fn project_terminal_items<P, T, U>(
240        projection: &P,
241        values: impl IntoIterator<Item = T>,
242        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
243    ) -> Result<Vec<U>, QueryError>
244    where
245        P: ValueProjectionExpr,
246    {
247        values
248            .into_iter()
249            .map(|value| map(projection, value))
250            .collect()
251    }
252
253    // ------------------------------------------------------------------
254    // Execution terminals — semantic only
255    // ------------------------------------------------------------------
256
257    /// Execute and return whether the result set is empty.
258    pub fn is_empty(&self) -> Result<bool, QueryError>
259    where
260        E: EntityValue,
261    {
262        self.not_exists()
263    }
264
265    /// Execute and return whether no matching row exists.
266    pub fn not_exists(&self) -> Result<bool, QueryError>
267    where
268        E: EntityValue,
269    {
270        Ok(!self.exists()?)
271    }
272
273    /// Execute and return whether at least one matching row exists.
274    pub fn exists(&self) -> Result<bool, QueryError>
275    where
276        E: EntityValue,
277    {
278        self.execute_prepared_existing_rows_terminal_output(
279            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
280        )?
281        .into_exists()
282        .map_err(QueryError::execute)
283    }
284
285    /// Explain scalar `exists()` routing without executing the terminal.
286    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
287    where
288        E: EntityValue,
289    {
290        self.explain_prepared_aggregate_non_paged_terminal(
291            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
292        )
293    }
294
295    /// Explain scalar `not_exists()` routing without executing the terminal.
296    ///
297    /// This remains an `exists()` execution plan with negated boolean semantics.
298    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
299    where
300        E: EntityValue,
301    {
302        self.explain_exists()
303    }
304
305    /// Explain scalar load execution shape without executing the query.
306    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
307    where
308        E: EntityValue,
309    {
310        self.explain_execution_descriptor()
311    }
312
313    /// Explain scalar load execution shape as deterministic text.
314    pub fn explain_execution_text(&self) -> Result<String, QueryError>
315    where
316        E: EntityValue,
317    {
318        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
319    }
320
321    /// Explain scalar load execution shape as canonical JSON.
322    pub fn explain_execution_json(&self) -> Result<String, QueryError>
323    where
324        E: EntityValue,
325    {
326        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
327    }
328
329    /// Explain scalar load execution shape as verbose text with diagnostics.
330    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
331    where
332        E: EntityValue,
333    {
334        self.explain_execution_verbose_text()
335    }
336
337    /// Execute and return the number of matching rows.
338    pub fn count(&self) -> Result<u32, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.execute_prepared_existing_rows_terminal_output(
343            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
344        )?
345        .into_count()
346        .map_err(QueryError::execute)
347    }
348
349    /// Execute and return the total persisted payload bytes for the effective
350    /// result window.
351    pub fn bytes(&self) -> Result<u64, QueryError>
352    where
353        E: EntityValue,
354    {
355        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
356    }
357
358    /// Execute and return the total serialized bytes for `field` over the
359    /// effective result window.
360    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
361    where
362        E: EntityValue,
363    {
364        self.ensure_non_paged_mode_ready()?;
365
366        Self::with_slot(field, |target_slot| {
367            self.session
368                .execute_load_query_with(self.query(), move |load, plan| {
369                    load.bytes_by_slot(plan, target_slot)
370                })
371        })
372    }
373
374    /// Explain `bytes_by(field)` routing without executing the terminal.
375    pub fn explain_bytes_by(
376        &self,
377        field: impl AsRef<str>,
378    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
379    where
380        E: EntityValue,
381    {
382        self.ensure_non_paged_mode_ready()?;
383
384        Self::with_slot(field, |target_slot| {
385            self.session
386                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
387        })
388    }
389
390    /// Execute and return the smallest matching identifier, if any.
391    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
392    where
393        E: EntityValue,
394    {
395        self.execute_prepared_scalar_terminal_output(
396            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
397        )?
398        .into_id()
399        .map_err(QueryError::execute)
400    }
401
402    /// Explain scalar `min()` routing without executing the terminal.
403    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
404    where
405        E: EntityValue,
406    {
407        self.explain_prepared_aggregate_non_paged_terminal(
408            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
409        )
410    }
411
412    /// Execute and return the id of the row with the smallest value for `field`.
413    ///
414    /// Ties are deterministic: equal field values resolve by primary key ascending.
415    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
416    where
417        E: EntityValue,
418    {
419        self.ensure_non_paged_mode_ready()?;
420
421        Self::with_slot(field, |target_slot| {
422            self.execute_prepared_scalar_terminal_output(
423                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
424            )?
425            .into_id()
426            .map_err(QueryError::execute)
427        })
428    }
429
430    /// Execute and return the largest matching identifier, if any.
431    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
432    where
433        E: EntityValue,
434    {
435        self.execute_prepared_scalar_terminal_output(
436            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
437        )?
438        .into_id()
439        .map_err(QueryError::execute)
440    }
441
442    /// Explain scalar `max()` routing without executing the terminal.
443    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
444    where
445        E: EntityValue,
446    {
447        self.explain_prepared_aggregate_non_paged_terminal(
448            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
449        )
450    }
451
452    /// Execute and return the id of the row with the largest value for `field`.
453    ///
454    /// Ties are deterministic: equal field values resolve by primary key ascending.
455    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
456    where
457        E: EntityValue,
458    {
459        self.ensure_non_paged_mode_ready()?;
460
461        Self::with_slot(field, |target_slot| {
462            self.execute_prepared_scalar_terminal_output(
463                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
464            )?
465            .into_id()
466            .map_err(QueryError::execute)
467        })
468    }
469
470    /// Execute and return the id at zero-based ordinal `nth` when rows are
471    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
472    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
473    where
474        E: EntityValue,
475    {
476        self.ensure_non_paged_mode_ready()?;
477
478        Self::with_slot(field, |target_slot| {
479            self.execute_prepared_order_sensitive_terminal_output(
480                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
481            )?
482            .into_id()
483            .map_err(QueryError::execute)
484        })
485    }
486
487    /// Execute and return the sum of `field` over matching rows.
488    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
489    where
490        E: EntityValue,
491    {
492        self.ensure_non_paged_mode_ready()?;
493
494        Self::with_slot(field, |target_slot| {
495            self.execute_prepared_numeric_field_terminal(
496                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
497            )
498        })
499    }
500
501    /// Explain scalar `sum_by(field)` routing without executing the terminal.
502    pub fn explain_sum_by(
503        &self,
504        field: impl AsRef<str>,
505    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
506    where
507        E: EntityValue,
508    {
509        self.ensure_non_paged_mode_ready()?;
510
511        Self::with_slot(field, |target_slot| {
512            self.explain_prepared_aggregate_non_paged_terminal(
513                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
514            )
515        })
516    }
517
518    /// Execute and return the sum of distinct `field` values.
519    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
520    where
521        E: EntityValue,
522    {
523        self.ensure_non_paged_mode_ready()?;
524
525        Self::with_slot(field, |target_slot| {
526            self.execute_prepared_numeric_field_terminal(
527                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
528            )
529        })
530    }
531
532    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
533    pub fn explain_sum_distinct_by(
534        &self,
535        field: impl AsRef<str>,
536    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
537    where
538        E: EntityValue,
539    {
540        self.ensure_non_paged_mode_ready()?;
541
542        Self::with_slot(field, |target_slot| {
543            self.explain_prepared_aggregate_non_paged_terminal(
544                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
545            )
546        })
547    }
548
549    /// Execute and return the average of `field` over matching rows.
550    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
551    where
552        E: EntityValue,
553    {
554        self.ensure_non_paged_mode_ready()?;
555
556        Self::with_slot(field, |target_slot| {
557            self.execute_prepared_numeric_field_terminal(
558                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
559            )
560        })
561    }
562
563    /// Explain scalar `avg_by(field)` routing without executing the terminal.
564    pub fn explain_avg_by(
565        &self,
566        field: impl AsRef<str>,
567    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
568    where
569        E: EntityValue,
570    {
571        self.ensure_non_paged_mode_ready()?;
572
573        Self::with_slot(field, |target_slot| {
574            self.explain_prepared_aggregate_non_paged_terminal(
575                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
576            )
577        })
578    }
579
580    /// Execute and return the average of distinct `field` values.
581    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
582    where
583        E: EntityValue,
584    {
585        self.ensure_non_paged_mode_ready()?;
586
587        Self::with_slot(field, |target_slot| {
588            self.execute_prepared_numeric_field_terminal(
589                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
590            )
591        })
592    }
593
594    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
595    pub fn explain_avg_distinct_by(
596        &self,
597        field: impl AsRef<str>,
598    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
599    where
600        E: EntityValue,
601    {
602        self.ensure_non_paged_mode_ready()?;
603
604        Self::with_slot(field, |target_slot| {
605            self.explain_prepared_aggregate_non_paged_terminal(
606                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
607            )
608        })
609    }
610
611    /// Execute and return the median id by `field` using deterministic ordering
612    /// `(field asc, primary key asc)`.
613    ///
614    /// Even-length windows select the lower median.
615    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
616    where
617        E: EntityValue,
618    {
619        self.ensure_non_paged_mode_ready()?;
620
621        Self::with_slot(field, |target_slot| {
622            self.execute_prepared_order_sensitive_terminal_output(
623                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
624            )?
625            .into_id()
626            .map_err(QueryError::execute)
627        })
628    }
629
630    /// Execute and return the number of distinct values for `field` over the
631    /// effective result window.
632    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
633    where
634        E: EntityValue,
635    {
636        self.ensure_non_paged_mode_ready()?;
637
638        Self::with_slot(field, |target_slot| {
639            self.execute_prepared_projection_terminal_output(
640                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
641            )?
642            .into_count()
643            .map_err(QueryError::execute)
644        })
645    }
646
647    /// Explain `count_distinct_by(field)` routing without executing the terminal.
648    pub fn explain_count_distinct_by(
649        &self,
650        field: impl AsRef<str>,
651    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
652    where
653        E: EntityValue,
654    {
655        self.ensure_non_paged_mode_ready()?;
656
657        Self::with_slot(field, |target_slot| {
658            self.explain_prepared_projection_non_paged_terminal(
659                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
660            )
661        })
662    }
663
664    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
665    ///
666    /// Tie handling is deterministic for both extrema: primary key ascending.
667    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
668    where
669        E: EntityValue,
670    {
671        self.ensure_non_paged_mode_ready()?;
672
673        Self::with_slot(field, |target_slot| {
674            self.execute_prepared_order_sensitive_terminal_output(
675                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
676            )?
677            .into_id_pair()
678            .map_err(QueryError::execute)
679        })
680    }
681
682    /// Execute and return projected field values for the effective result window.
683    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
684    where
685        E: EntityValue,
686    {
687        self.ensure_non_paged_mode_ready()?;
688
689        Self::with_slot(field, |target_slot| {
690            self.execute_prepared_projection_terminal_output(
691                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
692            )?
693            .into_values()
694            .map_err(QueryError::execute)
695        })
696    }
697
698    /// Execute and return projected values for one shared bounded projection
699    /// over the effective response window.
700    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<Value>, QueryError>
701    where
702        E: EntityValue,
703        P: ValueProjectionExpr,
704    {
705        self.ensure_non_paged_mode_ready()?;
706
707        Self::with_slot(projection.field(), |target_slot| {
708            let values = self
709                .execute_prepared_projection_terminal_output(
710                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
711                )?
712                .into_values()
713                .map_err(QueryError::execute)?;
714
715            Self::project_terminal_items(projection, values, |projection, value| {
716                projection.apply_value(value)
717            })
718        })
719    }
720
721    /// Explain `project_values(projection)` routing without executing it.
722    pub fn explain_project_values<P>(
723        &self,
724        projection: &P,
725    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
726    where
727        E: EntityValue,
728        P: ValueProjectionExpr,
729    {
730        self.ensure_non_paged_mode_ready()?;
731
732        Self::with_slot(projection.field(), |target_slot| {
733            self.explain_prepared_projection_non_paged_terminal(
734                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
735            )
736        })
737    }
738
739    /// Explain `values_by(field)` routing without executing the terminal.
740    pub fn explain_values_by(
741        &self,
742        field: impl AsRef<str>,
743    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
744    where
745        E: EntityValue,
746    {
747        self.ensure_non_paged_mode_ready()?;
748
749        Self::with_slot(field, |target_slot| {
750            self.explain_prepared_projection_non_paged_terminal(
751                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
752            )
753        })
754    }
755
756    /// Execute and return the first `k` rows from the effective response window.
757    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
758    where
759        E: EntityValue,
760    {
761        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
762    }
763
764    /// Execute and return the top `k` rows by `field` under deterministic
765    /// ordering `(field desc, primary_key asc)` over the effective response
766    /// window.
767    ///
768    /// This terminal applies its own ordering and does not preserve query
769    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
770    /// matches `max_by(field)` selection semantics.
771    pub fn top_k_by(
772        &self,
773        field: impl AsRef<str>,
774        take_count: u32,
775    ) -> Result<EntityResponse<E>, QueryError>
776    where
777        E: EntityValue,
778    {
779        self.ensure_non_paged_mode_ready()?;
780
781        Self::with_slot(field, |target_slot| {
782            self.session
783                .execute_load_query_with(self.query(), move |load, plan| {
784                    load.top_k_by_slot(plan, target_slot, take_count)
785                })
786        })
787    }
788
789    /// Execute and return the bottom `k` rows by `field` under deterministic
790    /// ordering `(field asc, primary_key asc)` over the effective response
791    /// window.
792    ///
793    /// This terminal applies its own ordering and does not preserve query
794    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
795    /// matches `min_by(field)` selection semantics.
796    pub fn bottom_k_by(
797        &self,
798        field: impl AsRef<str>,
799        take_count: u32,
800    ) -> Result<EntityResponse<E>, QueryError>
801    where
802        E: EntityValue,
803    {
804        self.ensure_non_paged_mode_ready()?;
805
806        Self::with_slot(field, |target_slot| {
807            self.session
808                .execute_load_query_with(self.query(), move |load, plan| {
809                    load.bottom_k_by_slot(plan, target_slot, take_count)
810                })
811        })
812    }
813
814    /// Execute and return projected values for the top `k` rows by `field`
815    /// under deterministic ordering `(field desc, primary_key asc)` over the
816    /// effective response window.
817    ///
818    /// Ranking is applied before projection and does not preserve query
819    /// `order_by(...)` row order in the returned values. For `k = 1`, this
820    /// matches `max_by(field)` projected to one value.
821    pub fn top_k_by_values(
822        &self,
823        field: impl AsRef<str>,
824        take_count: u32,
825    ) -> Result<Vec<Value>, QueryError>
826    where
827        E: EntityValue,
828    {
829        self.ensure_non_paged_mode_ready()?;
830
831        Self::with_slot(field, |target_slot| {
832            self.session
833                .execute_load_query_with(self.query(), move |load, plan| {
834                    load.top_k_by_values_slot(plan, target_slot, take_count)
835                })
836        })
837    }
838
839    /// Execute and return projected values for the bottom `k` rows by `field`
840    /// under deterministic ordering `(field asc, primary_key asc)` over the
841    /// effective response window.
842    ///
843    /// Ranking is applied before projection and does not preserve query
844    /// `order_by(...)` row order in the returned values. For `k = 1`, this
845    /// matches `min_by(field)` projected to one value.
846    pub fn bottom_k_by_values(
847        &self,
848        field: impl AsRef<str>,
849        take_count: u32,
850    ) -> Result<Vec<Value>, QueryError>
851    where
852        E: EntityValue,
853    {
854        self.ensure_non_paged_mode_ready()?;
855
856        Self::with_slot(field, |target_slot| {
857            self.session
858                .execute_load_query_with(self.query(), move |load, plan| {
859                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
860                })
861        })
862    }
863
864    /// Execute and return projected id/value pairs for the top `k` rows by
865    /// `field` under deterministic ordering `(field desc, primary_key asc)`
866    /// over the effective response window.
867    ///
868    /// Ranking is applied before projection and does not preserve query
869    /// `order_by(...)` row order in the returned values. For `k = 1`, this
870    /// matches `max_by(field)` projected to one `(id, value)` pair.
871    pub fn top_k_by_with_ids(
872        &self,
873        field: impl AsRef<str>,
874        take_count: u32,
875    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
876    where
877        E: EntityValue,
878    {
879        self.ensure_non_paged_mode_ready()?;
880
881        Self::with_slot(field, |target_slot| {
882            self.session
883                .execute_load_query_with(self.query(), move |load, plan| {
884                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
885                })
886        })
887    }
888
889    /// Execute and return projected id/value pairs for the bottom `k` rows by
890    /// `field` under deterministic ordering `(field asc, primary_key asc)`
891    /// over the effective response window.
892    ///
893    /// Ranking is applied before projection and does not preserve query
894    /// `order_by(...)` row order in the returned values. For `k = 1`, this
895    /// matches `min_by(field)` projected to one `(id, value)` pair.
896    pub fn bottom_k_by_with_ids(
897        &self,
898        field: impl AsRef<str>,
899        take_count: u32,
900    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
901    where
902        E: EntityValue,
903    {
904        self.ensure_non_paged_mode_ready()?;
905
906        Self::with_slot(field, |target_slot| {
907            self.session
908                .execute_load_query_with(self.query(), move |load, plan| {
909                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
910                })
911        })
912    }
913
914    /// Execute and return distinct projected field values for the effective
915    /// result window, preserving first-observed value order.
916    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
917    where
918        E: EntityValue,
919    {
920        self.ensure_non_paged_mode_ready()?;
921
922        Self::with_slot(field, |target_slot| {
923            self.execute_prepared_projection_terminal_output(
924                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
925            )?
926            .into_values()
927            .map_err(QueryError::execute)
928        })
929    }
930
931    /// Explain `distinct_values_by(field)` routing without executing the terminal.
932    pub fn explain_distinct_values_by(
933        &self,
934        field: impl AsRef<str>,
935    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
936    where
937        E: EntityValue,
938    {
939        self.ensure_non_paged_mode_ready()?;
940
941        Self::with_slot(field, |target_slot| {
942            self.explain_prepared_projection_non_paged_terminal(
943                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
944            )
945        })
946    }
947
948    /// Execute and return projected field values paired with row ids for the
949    /// effective result window.
950    pub fn values_by_with_ids(
951        &self,
952        field: impl AsRef<str>,
953    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
954    where
955        E: EntityValue,
956    {
957        self.ensure_non_paged_mode_ready()?;
958
959        Self::with_slot(field, |target_slot| {
960            self.execute_prepared_projection_terminal_output(
961                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
962            )?
963            .into_values_with_ids()
964            .map_err(QueryError::execute)
965        })
966    }
967
968    /// Execute and return projected id/value pairs for one shared bounded
969    /// projection over the effective response window.
970    pub fn project_values_with_ids<P>(
971        &self,
972        projection: &P,
973    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
974    where
975        E: EntityValue,
976        P: ValueProjectionExpr,
977    {
978        self.ensure_non_paged_mode_ready()?;
979
980        Self::with_slot(projection.field(), |target_slot| {
981            let values = self
982                .execute_prepared_projection_terminal_output(
983                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
984                )?
985                .into_values_with_ids::<E>()
986                .map_err(QueryError::execute)?;
987
988            Self::project_terminal_items(projection, values, |projection, (id, value)| {
989                Ok((id, projection.apply_value(value)?))
990            })
991        })
992    }
993
994    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
995    pub fn explain_values_by_with_ids(
996        &self,
997        field: impl AsRef<str>,
998    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
999    where
1000        E: EntityValue,
1001    {
1002        self.ensure_non_paged_mode_ready()?;
1003
1004        Self::with_slot(field, |target_slot| {
1005            self.explain_prepared_projection_non_paged_terminal(
1006                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
1007            )
1008        })
1009    }
1010
1011    /// Execute and return the first projected field value in effective response
1012    /// order, if any.
1013    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1014    where
1015        E: EntityValue,
1016    {
1017        self.ensure_non_paged_mode_ready()?;
1018
1019        Self::with_slot(field, |target_slot| {
1020            self.execute_prepared_projection_terminal_output(
1021                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1022            )?
1023            .into_terminal_value()
1024            .map_err(QueryError::execute)
1025        })
1026    }
1027
1028    /// Execute and return the first projected value for one shared bounded
1029    /// projection in effective response order, if any.
1030    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1031    where
1032        E: EntityValue,
1033        P: ValueProjectionExpr,
1034    {
1035        self.ensure_non_paged_mode_ready()?;
1036
1037        Self::with_slot(projection.field(), |target_slot| {
1038            let value = self
1039                .execute_prepared_projection_terminal_output(
1040                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1041                )?
1042                .into_terminal_value()
1043                .map_err(QueryError::execute)?;
1044
1045            let mut projected =
1046                Self::project_terminal_items(projection, value, |projection, value| {
1047                    projection.apply_value(value)
1048                })?;
1049
1050            Ok(projected.pop())
1051        })
1052    }
1053
1054    /// Explain `first_value_by(field)` routing without executing the terminal.
1055    pub fn explain_first_value_by(
1056        &self,
1057        field: impl AsRef<str>,
1058    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1059    where
1060        E: EntityValue,
1061    {
1062        self.ensure_non_paged_mode_ready()?;
1063
1064        Self::with_slot(field, |target_slot| {
1065            self.explain_prepared_projection_non_paged_terminal(
1066                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1067            )
1068        })
1069    }
1070
1071    /// Execute and return the last projected field value in effective response
1072    /// order, if any.
1073    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1074    where
1075        E: EntityValue,
1076    {
1077        self.ensure_non_paged_mode_ready()?;
1078
1079        Self::with_slot(field, |target_slot| {
1080            self.execute_prepared_projection_terminal_output(
1081                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1082            )?
1083            .into_terminal_value()
1084            .map_err(QueryError::execute)
1085        })
1086    }
1087
1088    /// Execute and return the last projected value for one shared bounded
1089    /// projection in effective response order, if any.
1090    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1091    where
1092        E: EntityValue,
1093        P: ValueProjectionExpr,
1094    {
1095        self.ensure_non_paged_mode_ready()?;
1096
1097        Self::with_slot(projection.field(), |target_slot| {
1098            let value = self
1099                .execute_prepared_projection_terminal_output(
1100                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1101                )?
1102                .into_terminal_value()
1103                .map_err(QueryError::execute)?;
1104
1105            let mut projected =
1106                Self::project_terminal_items(projection, value, |projection, value| {
1107                    projection.apply_value(value)
1108                })?;
1109
1110            Ok(projected.pop())
1111        })
1112    }
1113
1114    /// Explain `last_value_by(field)` routing without executing the terminal.
1115    pub fn explain_last_value_by(
1116        &self,
1117        field: impl AsRef<str>,
1118    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1119    where
1120        E: EntityValue,
1121    {
1122        self.ensure_non_paged_mode_ready()?;
1123
1124        Self::with_slot(field, |target_slot| {
1125            self.explain_prepared_projection_non_paged_terminal(
1126                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1127            )
1128        })
1129    }
1130
1131    /// Execute and return the first matching identifier in response order, if any.
1132    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1133    where
1134        E: EntityValue,
1135    {
1136        self.execute_prepared_order_sensitive_terminal_output(
1137            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1138        )?
1139        .into_id()
1140        .map_err(QueryError::execute)
1141    }
1142
1143    /// Explain scalar `first()` routing without executing the terminal.
1144    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1145    where
1146        E: EntityValue,
1147    {
1148        self.explain_prepared_aggregate_non_paged_terminal(
1149            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1150        )
1151    }
1152
1153    /// Execute and return the last matching identifier in response order, if any.
1154    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1155    where
1156        E: EntityValue,
1157    {
1158        self.execute_prepared_order_sensitive_terminal_output(
1159            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1160        )?
1161        .into_id()
1162        .map_err(QueryError::execute)
1163    }
1164
1165    /// Explain scalar `last()` routing without executing the terminal.
1166    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1167    where
1168        E: EntityValue,
1169    {
1170        self.explain_prepared_aggregate_non_paged_terminal(
1171            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1172        )
1173    }
1174
1175    /// Execute and require exactly one matching row.
1176    pub fn require_one(&self) -> Result<(), QueryError>
1177    where
1178        E: EntityValue,
1179    {
1180        self.execute()?.into_rows()?.require_one()?;
1181        Ok(())
1182    }
1183
1184    /// Execute and require at least one matching row.
1185    pub fn require_some(&self) -> Result<(), QueryError>
1186    where
1187        E: EntityValue,
1188    {
1189        self.execute()?.into_rows()?.require_some()?;
1190        Ok(())
1191    }
1192}
1193
1194impl From<PreparedFluentScalarTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1195    fn from(value: PreparedFluentScalarTerminalRuntimeRequest) -> Self {
1196        match value {
1197            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1198                Self::IdTerminal { kind }
1199            }
1200            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1201                Self::IdBySlot { kind, target_field }
1202            }
1203        }
1204    }
1205}
1206
1207impl From<PreparedFluentExistingRowsTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1208    fn from(value: PreparedFluentExistingRowsTerminalRuntimeRequest) -> Self {
1209        match value {
1210            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => Self::Count,
1211            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => Self::Exists,
1212        }
1213    }
1214}
1215
1216impl From<PreparedFluentNumericFieldRuntimeRequest> for ScalarNumericFieldBoundaryRequest {
1217    fn from(value: PreparedFluentNumericFieldRuntimeRequest) -> Self {
1218        match value {
1219            PreparedFluentNumericFieldRuntimeRequest::Sum => Self::Sum,
1220            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => Self::SumDistinct,
1221            PreparedFluentNumericFieldRuntimeRequest::Avg => Self::Avg,
1222            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => Self::AvgDistinct,
1223        }
1224    }
1225}
1226
1227impl From<PreparedFluentOrderSensitiveTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1228    fn from(value: PreparedFluentOrderSensitiveTerminalRuntimeRequest) -> Self {
1229        match value {
1230            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1231                Self::IdTerminal { kind }
1232            }
1233            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1234                Self::NthBySlot { target_field, nth }
1235            }
1236            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1237                Self::MedianBySlot { target_field }
1238            }
1239            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1240                Self::MinMaxBySlot { target_field }
1241            }
1242        }
1243    }
1244}
1245
1246impl From<PreparedFluentProjectionRuntimeRequest> for ScalarProjectionBoundaryRequest {
1247    fn from(value: PreparedFluentProjectionRuntimeRequest) -> Self {
1248        match value {
1249            PreparedFluentProjectionRuntimeRequest::Values => Self::Values,
1250            PreparedFluentProjectionRuntimeRequest::DistinctValues => Self::DistinctValues,
1251            PreparedFluentProjectionRuntimeRequest::CountDistinct => Self::CountDistinct,
1252            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => Self::ValuesWithIds,
1253            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1254                Self::TerminalValue { terminal_kind }
1255            }
1256        }
1257    }
1258}