Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        executor::{
10            LoadExecutor, PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
11            ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
12            ScalarTerminalBoundaryRequest,
13        },
14        query::{
15            api::ResponseCardinalityExt,
16            builder::{
17                PreparedFluentAggregateExplainStrategy,
18                PreparedFluentExistingRowsTerminalRuntimeRequest,
19                PreparedFluentExistingRowsTerminalStrategy,
20                PreparedFluentNumericFieldRuntimeRequest, PreparedFluentNumericFieldStrategy,
21                PreparedFluentOrderSensitiveTerminalRuntimeRequest,
22                PreparedFluentOrderSensitiveTerminalStrategy,
23                PreparedFluentProjectionRuntimeRequest, PreparedFluentProjectionStrategy,
24                PreparedFluentScalarTerminalRuntimeRequest, PreparedFluentScalarTerminalStrategy,
25                ValueProjectionExpr,
26            },
27            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
28            fluent::load::{FluentLoadQuery, LoadQueryResult},
29            intent::QueryError,
30            plan::AggregateKind,
31        },
32        response::EntityResponse,
33    },
34    error::InternalError,
35    traits::EntityValue,
36    types::{Decimal, Id},
37    value::Value,
38};
39
40type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
41
42impl<E> FluentLoadQuery<'_, E>
43where
44    E: PersistedRow,
45{
46    // ------------------------------------------------------------------
47    // Execution (single semantic boundary)
48    // ------------------------------------------------------------------
49
50    /// Execute this query using the session's policy settings.
51    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
52    where
53        E: EntityValue,
54    {
55        self.ensure_non_paged_mode_ready()?;
56        self.session.execute_query_result(self.query())
57    }
58
59    // Run one scalar terminal through the canonical non-paged fluent policy
60    // gate before handing execution to the session load-query adapter.
61    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
62    where
63        E: EntityValue,
64        F: FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
65    {
66        self.ensure_non_paged_mode_ready()?;
67
68        self.session.execute_load_query_with(self.query(), execute)
69    }
70
71    // Run one read-only query/session projection through the canonical
72    // non-paged fluent policy gate so explain-style helpers do not each
73    // repeat the same readiness check and session handoff shell.
74    fn map_non_paged_query_output<T>(
75        &self,
76        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
77    ) -> Result<T, QueryError>
78    where
79        E: EntityValue,
80    {
81        self.ensure_non_paged_mode_ready()?;
82        map(self.session, self.query())
83    }
84
85    // Run one explain-visible aggregate terminal through the canonical
86    // non-paged fluent policy gate using the prepared aggregate strategy as
87    // the single explain projection source.
88    fn explain_prepared_aggregate_non_paged_terminal<S>(
89        &self,
90        strategy: &S,
91    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
92    where
93        E: EntityValue,
94        S: PreparedFluentAggregateExplainStrategy,
95    {
96        self.map_non_paged_query_output(|session, query| {
97            session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
98        })
99    }
100
101    // Run one prepared projection/distinct explain terminal through the
102    // canonical non-paged fluent policy gate using the prepared projection
103    // strategy as the single explain projection source.
104    fn explain_prepared_projection_non_paged_terminal(
105        &self,
106        strategy: &PreparedFluentProjectionStrategy,
107    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
108    where
109        E: EntityValue,
110    {
111        self.map_non_paged_query_output(|session, query| {
112            session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
113        })
114    }
115
116    // Resolve the structural execution descriptor for this fluent load query
117    // through the session-owned visible-index explain path once.
118    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
119    where
120        E: EntityValue,
121    {
122        self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
123    }
124
125    // Render one descriptor-derived execution surface so text/json explain
126    // terminals do not each forward the same session explain call ad hoc.
127    fn render_execution_descriptor(
128        &self,
129        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
130    ) -> Result<String, QueryError>
131    where
132        E: EntityValue,
133    {
134        let descriptor = self.explain_execution_descriptor()?;
135
136        Ok(render(descriptor))
137    }
138
139    // Render one verbose execution explain payload through the same
140    // session-owned visible-index lane used by the other fluent explain forms.
141    fn explain_execution_verbose_text(&self) -> Result<String, QueryError>
142    where
143        E: EntityValue,
144    {
145        self.map_non_paged_query_output(
146            DbSession::explain_query_execution_verbose_with_visible_indexes,
147        )
148    }
149
150    // Execute one prepared fluent scalar terminal through the canonical
151    // non-paged fluent policy gate using the prepared runtime request as the
152    // single execution source.
153    fn execute_prepared_scalar_terminal_output(
154        &self,
155        strategy: PreparedFluentScalarTerminalStrategy,
156    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
157    where
158        E: EntityValue,
159    {
160        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
161    }
162
163    // Execute one prepared fluent existing-rows terminal through the
164    // canonical non-paged fluent policy gate using the prepared existing-rows
165    // strategy as the single runtime source.
166    fn execute_prepared_existing_rows_terminal_output(
167        &self,
168        strategy: PreparedFluentExistingRowsTerminalStrategy,
169    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
170    where
171        E: EntityValue,
172    {
173        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
174    }
175
176    // Execute one prepared fluent numeric-field terminal through the canonical
177    // non-paged fluent policy gate using the prepared numeric strategy as the
178    // single runtime source.
179    fn execute_prepared_numeric_field_terminal(
180        &self,
181        strategy: PreparedFluentNumericFieldStrategy,
182    ) -> Result<Option<Decimal>, QueryError>
183    where
184        E: EntityValue,
185    {
186        let (target_field, runtime_request) = strategy.into_runtime_parts();
187
188        self.execute_scalar_non_paged_terminal(move |load, plan| {
189            load.execute_numeric_field_boundary(plan, target_field, runtime_request.into())
190        })
191    }
192
193    // Execute one prepared fluent order-sensitive terminal through the
194    // canonical non-paged fluent policy gate using the prepared order-sensitive
195    // strategy as the single runtime source.
196    fn execute_prepared_order_sensitive_terminal_output(
197        &self,
198        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
199    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
200    where
201        E: EntityValue,
202    {
203        self.execute_scalar_terminal_boundary_output(strategy.into_runtime_request())
204    }
205
206    // Execute one prepared fluent projection/distinct terminal through the
207    // canonical non-paged fluent policy gate using the prepared projection
208    // strategy as the single runtime source.
209    fn execute_prepared_projection_terminal_output(
210        &self,
211        strategy: PreparedFluentProjectionStrategy,
212    ) -> Result<crate::db::executor::ScalarProjectionBoundaryOutput, QueryError>
213    where
214        E: EntityValue,
215    {
216        let (target_field, runtime_request) = strategy.into_runtime_parts();
217
218        self.execute_scalar_non_paged_terminal(move |load, plan| {
219            load.execute_scalar_projection_boundary(plan, target_field, runtime_request.into())
220        })
221    }
222
223    // Execute one already-lowered scalar terminal boundary request through
224    // the canonical non-paged fluent policy gate so terminal families that
225    // share the same scalar executor contract do not each rebuild it.
226    fn execute_scalar_terminal_boundary_output<R>(
227        &self,
228        runtime_request: R,
229    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
230    where
231        E: EntityValue,
232        R: Into<ScalarTerminalBoundaryRequest>,
233    {
234        self.execute_scalar_non_paged_terminal(move |load, plan| {
235            load.execute_scalar_terminal_request(plan, runtime_request.into())
236        })
237    }
238
239    // Apply one shared bounded value projection to iterator-like terminal
240    // output while preserving source order and cardinality.
241    fn project_terminal_items<P, T, U>(
242        projection: &P,
243        values: impl IntoIterator<Item = T>,
244        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
245    ) -> Result<Vec<U>, QueryError>
246    where
247        P: ValueProjectionExpr,
248    {
249        values
250            .into_iter()
251            .map(|value| map(projection, value))
252            .collect()
253    }
254
255    // ------------------------------------------------------------------
256    // Execution terminals — semantic only
257    // ------------------------------------------------------------------
258
259    /// Execute and return whether the result set is empty.
260    pub fn is_empty(&self) -> Result<bool, QueryError>
261    where
262        E: EntityValue,
263    {
264        self.not_exists()
265    }
266
267    /// Execute and return whether no matching row exists.
268    pub fn not_exists(&self) -> Result<bool, QueryError>
269    where
270        E: EntityValue,
271    {
272        Ok(!self.exists()?)
273    }
274
275    /// Execute and return whether at least one matching row exists.
276    pub fn exists(&self) -> Result<bool, QueryError>
277    where
278        E: EntityValue,
279    {
280        self.execute_prepared_existing_rows_terminal_output(
281            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
282        )?
283        .into_exists()
284        .map_err(QueryError::execute)
285    }
286
287    /// Explain scalar `exists()` routing without executing the terminal.
288    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
289    where
290        E: EntityValue,
291    {
292        self.explain_prepared_aggregate_non_paged_terminal(
293            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
294        )
295    }
296
297    /// Explain scalar `not_exists()` routing without executing the terminal.
298    ///
299    /// This remains an `exists()` execution plan with negated boolean semantics.
300    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
301    where
302        E: EntityValue,
303    {
304        self.explain_exists()
305    }
306
307    /// Explain scalar load execution shape without executing the query.
308    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
309    where
310        E: EntityValue,
311    {
312        self.explain_execution_descriptor()
313    }
314
315    /// Explain scalar load execution shape as deterministic text.
316    pub fn explain_execution_text(&self) -> Result<String, QueryError>
317    where
318        E: EntityValue,
319    {
320        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
321    }
322
323    /// Explain scalar load execution shape as canonical JSON.
324    pub fn explain_execution_json(&self) -> Result<String, QueryError>
325    where
326        E: EntityValue,
327    {
328        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
329    }
330
331    /// Explain scalar load execution shape as verbose text with diagnostics.
332    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
333    where
334        E: EntityValue,
335    {
336        self.explain_execution_verbose_text()
337    }
338
339    /// Execute and return the number of matching rows.
340    pub fn count(&self) -> Result<u32, QueryError>
341    where
342        E: EntityValue,
343    {
344        self.execute_prepared_existing_rows_terminal_output(
345            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
346        )?
347        .into_count()
348        .map_err(QueryError::execute)
349    }
350
351    /// Execute and return the total persisted payload bytes for the effective
352    /// result window.
353    pub fn bytes(&self) -> Result<u64, QueryError>
354    where
355        E: EntityValue,
356    {
357        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
358    }
359
360    /// Execute and return the total serialized bytes for `field` over the
361    /// effective result window.
362    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
363    where
364        E: EntityValue,
365    {
366        self.with_non_paged_slot(field, |target_slot| {
367            self.session
368                .execute_load_query_with(self.query(), move |load, plan| {
369                    load.bytes_by_slot(plan, target_slot)
370                })
371        })
372    }
373
374    /// Explain `bytes_by(field)` routing without executing the terminal.
375    pub fn explain_bytes_by(
376        &self,
377        field: impl AsRef<str>,
378    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
379    where
380        E: EntityValue,
381    {
382        self.with_non_paged_slot(field, |target_slot| {
383            self.session
384                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
385        })
386    }
387
388    /// Execute and return the smallest matching identifier, if any.
389    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
390    where
391        E: EntityValue,
392    {
393        self.execute_prepared_scalar_terminal_output(
394            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
395        )?
396        .into_id()
397        .map_err(QueryError::execute)
398    }
399
400    /// Explain scalar `min()` routing without executing the terminal.
401    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
402    where
403        E: EntityValue,
404    {
405        self.explain_prepared_aggregate_non_paged_terminal(
406            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
407        )
408    }
409
410    /// Execute and return the id of the row with the smallest value for `field`.
411    ///
412    /// Ties are deterministic: equal field values resolve by primary key ascending.
413    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
414    where
415        E: EntityValue,
416    {
417        self.with_non_paged_slot(field, |target_slot| {
418            self.execute_prepared_scalar_terminal_output(
419                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
420            )?
421            .into_id()
422            .map_err(QueryError::execute)
423        })
424    }
425
426    /// Execute and return the largest matching identifier, if any.
427    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
428    where
429        E: EntityValue,
430    {
431        self.execute_prepared_scalar_terminal_output(
432            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
433        )?
434        .into_id()
435        .map_err(QueryError::execute)
436    }
437
438    /// Explain scalar `max()` routing without executing the terminal.
439    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
440    where
441        E: EntityValue,
442    {
443        self.explain_prepared_aggregate_non_paged_terminal(
444            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
445        )
446    }
447
448    /// Execute and return the id of the row with the largest value for `field`.
449    ///
450    /// Ties are deterministic: equal field values resolve by primary key ascending.
451    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
452    where
453        E: EntityValue,
454    {
455        self.with_non_paged_slot(field, |target_slot| {
456            self.execute_prepared_scalar_terminal_output(
457                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
458            )?
459            .into_id()
460            .map_err(QueryError::execute)
461        })
462    }
463
464    /// Execute and return the id at zero-based ordinal `nth` when rows are
465    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
466    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
467    where
468        E: EntityValue,
469    {
470        self.with_non_paged_slot(field, |target_slot| {
471            self.execute_prepared_order_sensitive_terminal_output(
472                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
473            )?
474            .into_id()
475            .map_err(QueryError::execute)
476        })
477    }
478
479    /// Execute and return the sum of `field` over matching rows.
480    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
481    where
482        E: EntityValue,
483    {
484        self.with_non_paged_slot(field, |target_slot| {
485            self.execute_prepared_numeric_field_terminal(
486                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
487            )
488        })
489    }
490
491    /// Explain scalar `sum_by(field)` routing without executing the terminal.
492    pub fn explain_sum_by(
493        &self,
494        field: impl AsRef<str>,
495    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
496    where
497        E: EntityValue,
498    {
499        self.with_non_paged_slot(field, |target_slot| {
500            self.explain_prepared_aggregate_non_paged_terminal(
501                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
502            )
503        })
504    }
505
506    /// Execute and return the sum of distinct `field` values.
507    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
508    where
509        E: EntityValue,
510    {
511        self.with_non_paged_slot(field, |target_slot| {
512            self.execute_prepared_numeric_field_terminal(
513                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
514            )
515        })
516    }
517
518    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
519    pub fn explain_sum_distinct_by(
520        &self,
521        field: impl AsRef<str>,
522    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
523    where
524        E: EntityValue,
525    {
526        self.with_non_paged_slot(field, |target_slot| {
527            self.explain_prepared_aggregate_non_paged_terminal(
528                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
529            )
530        })
531    }
532
533    /// Execute and return the average of `field` over matching rows.
534    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
535    where
536        E: EntityValue,
537    {
538        self.with_non_paged_slot(field, |target_slot| {
539            self.execute_prepared_numeric_field_terminal(
540                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
541            )
542        })
543    }
544
545    /// Explain scalar `avg_by(field)` routing without executing the terminal.
546    pub fn explain_avg_by(
547        &self,
548        field: impl AsRef<str>,
549    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
550    where
551        E: EntityValue,
552    {
553        self.with_non_paged_slot(field, |target_slot| {
554            self.explain_prepared_aggregate_non_paged_terminal(
555                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
556            )
557        })
558    }
559
560    /// Execute and return the average of distinct `field` values.
561    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
562    where
563        E: EntityValue,
564    {
565        self.with_non_paged_slot(field, |target_slot| {
566            self.execute_prepared_numeric_field_terminal(
567                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
568            )
569        })
570    }
571
572    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
573    pub fn explain_avg_distinct_by(
574        &self,
575        field: impl AsRef<str>,
576    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
577    where
578        E: EntityValue,
579    {
580        self.with_non_paged_slot(field, |target_slot| {
581            self.explain_prepared_aggregate_non_paged_terminal(
582                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
583            )
584        })
585    }
586
587    /// Execute and return the median id by `field` using deterministic ordering
588    /// `(field asc, primary key asc)`.
589    ///
590    /// Even-length windows select the lower median.
591    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
592    where
593        E: EntityValue,
594    {
595        self.with_non_paged_slot(field, |target_slot| {
596            self.execute_prepared_order_sensitive_terminal_output(
597                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
598            )?
599            .into_id()
600            .map_err(QueryError::execute)
601        })
602    }
603
604    /// Execute and return the number of distinct values for `field` over the
605    /// effective result window.
606    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
607    where
608        E: EntityValue,
609    {
610        self.with_non_paged_slot(field, |target_slot| {
611            self.execute_prepared_projection_terminal_output(
612                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
613            )?
614            .into_count()
615            .map_err(QueryError::execute)
616        })
617    }
618
619    /// Explain `count_distinct_by(field)` routing without executing the terminal.
620    pub fn explain_count_distinct_by(
621        &self,
622        field: impl AsRef<str>,
623    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
624    where
625        E: EntityValue,
626    {
627        self.with_non_paged_slot(field, |target_slot| {
628            self.explain_prepared_projection_non_paged_terminal(
629                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
630            )
631        })
632    }
633
634    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
635    ///
636    /// Tie handling is deterministic for both extrema: primary key ascending.
637    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
638    where
639        E: EntityValue,
640    {
641        self.with_non_paged_slot(field, |target_slot| {
642            self.execute_prepared_order_sensitive_terminal_output(
643                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
644            )?
645            .into_id_pair()
646            .map_err(QueryError::execute)
647        })
648    }
649
650    /// Execute and return projected field values for the effective result window.
651    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
652    where
653        E: EntityValue,
654    {
655        self.with_non_paged_slot(field, |target_slot| {
656            self.execute_prepared_projection_terminal_output(
657                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
658            )?
659            .into_values()
660            .map_err(QueryError::execute)
661        })
662    }
663
664    /// Execute and return projected values for one shared bounded projection
665    /// over the effective response window.
666    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<Value>, QueryError>
667    where
668        E: EntityValue,
669        P: ValueProjectionExpr,
670    {
671        self.with_non_paged_slot(projection.field(), |target_slot| {
672            let values = self
673                .execute_prepared_projection_terminal_output(
674                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
675                )?
676                .into_values()
677                .map_err(QueryError::execute)?;
678
679            Self::project_terminal_items(projection, values, |projection, value| {
680                projection.apply_value(value)
681            })
682        })
683    }
684
685    /// Explain `project_values(projection)` routing without executing it.
686    pub fn explain_project_values<P>(
687        &self,
688        projection: &P,
689    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
690    where
691        E: EntityValue,
692        P: ValueProjectionExpr,
693    {
694        self.with_non_paged_slot(projection.field(), |target_slot| {
695            self.explain_prepared_projection_non_paged_terminal(
696                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
697            )
698        })
699    }
700
701    /// Explain `values_by(field)` routing without executing the terminal.
702    pub fn explain_values_by(
703        &self,
704        field: impl AsRef<str>,
705    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
706    where
707        E: EntityValue,
708    {
709        self.with_non_paged_slot(field, |target_slot| {
710            self.explain_prepared_projection_non_paged_terminal(
711                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
712            )
713        })
714    }
715
716    /// Execute and return the first `k` rows from the effective response window.
717    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
718    where
719        E: EntityValue,
720    {
721        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
722    }
723
724    /// Execute and return the top `k` rows by `field` under deterministic
725    /// ordering `(field desc, primary_key asc)` over the effective response
726    /// window.
727    ///
728    /// This terminal applies its own ordering and does not preserve query
729    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
730    /// matches `max_by(field)` selection semantics.
731    pub fn top_k_by(
732        &self,
733        field: impl AsRef<str>,
734        take_count: u32,
735    ) -> Result<EntityResponse<E>, QueryError>
736    where
737        E: EntityValue,
738    {
739        self.with_non_paged_slot(field, |target_slot| {
740            self.session
741                .execute_load_query_with(self.query(), move |load, plan| {
742                    load.top_k_by_slot(plan, target_slot, take_count)
743                })
744        })
745    }
746
747    /// Execute and return the bottom `k` rows by `field` under deterministic
748    /// ordering `(field asc, primary_key asc)` over the effective response
749    /// window.
750    ///
751    /// This terminal applies its own ordering and does not preserve query
752    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
753    /// matches `min_by(field)` selection semantics.
754    pub fn bottom_k_by(
755        &self,
756        field: impl AsRef<str>,
757        take_count: u32,
758    ) -> Result<EntityResponse<E>, QueryError>
759    where
760        E: EntityValue,
761    {
762        self.with_non_paged_slot(field, |target_slot| {
763            self.session
764                .execute_load_query_with(self.query(), move |load, plan| {
765                    load.bottom_k_by_slot(plan, target_slot, take_count)
766                })
767        })
768    }
769
770    /// Execute and return projected values for the top `k` rows by `field`
771    /// under deterministic ordering `(field desc, primary_key asc)` over the
772    /// effective response window.
773    ///
774    /// Ranking is applied before projection and does not preserve query
775    /// `order_by(...)` row order in the returned values. For `k = 1`, this
776    /// matches `max_by(field)` projected to one value.
777    pub fn top_k_by_values(
778        &self,
779        field: impl AsRef<str>,
780        take_count: u32,
781    ) -> Result<Vec<Value>, QueryError>
782    where
783        E: EntityValue,
784    {
785        self.with_non_paged_slot(field, |target_slot| {
786            self.session
787                .execute_load_query_with(self.query(), move |load, plan| {
788                    load.top_k_by_values_slot(plan, target_slot, take_count)
789                })
790        })
791    }
792
793    /// Execute and return projected values for the bottom `k` rows by `field`
794    /// under deterministic ordering `(field asc, primary_key asc)` over the
795    /// effective response window.
796    ///
797    /// Ranking is applied before projection and does not preserve query
798    /// `order_by(...)` row order in the returned values. For `k = 1`, this
799    /// matches `min_by(field)` projected to one value.
800    pub fn bottom_k_by_values(
801        &self,
802        field: impl AsRef<str>,
803        take_count: u32,
804    ) -> Result<Vec<Value>, QueryError>
805    where
806        E: EntityValue,
807    {
808        self.with_non_paged_slot(field, |target_slot| {
809            self.session
810                .execute_load_query_with(self.query(), move |load, plan| {
811                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
812                })
813        })
814    }
815
816    /// Execute and return projected id/value pairs for the top `k` rows by
817    /// `field` under deterministic ordering `(field desc, primary_key asc)`
818    /// over the effective response window.
819    ///
820    /// Ranking is applied before projection and does not preserve query
821    /// `order_by(...)` row order in the returned values. For `k = 1`, this
822    /// matches `max_by(field)` projected to one `(id, value)` pair.
823    pub fn top_k_by_with_ids(
824        &self,
825        field: impl AsRef<str>,
826        take_count: u32,
827    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
828    where
829        E: EntityValue,
830    {
831        self.with_non_paged_slot(field, |target_slot| {
832            self.session
833                .execute_load_query_with(self.query(), move |load, plan| {
834                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
835                })
836        })
837    }
838
839    /// Execute and return projected id/value pairs for the bottom `k` rows by
840    /// `field` under deterministic ordering `(field asc, primary_key asc)`
841    /// over the effective response window.
842    ///
843    /// Ranking is applied before projection and does not preserve query
844    /// `order_by(...)` row order in the returned values. For `k = 1`, this
845    /// matches `min_by(field)` projected to one `(id, value)` pair.
846    pub fn bottom_k_by_with_ids(
847        &self,
848        field: impl AsRef<str>,
849        take_count: u32,
850    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
851    where
852        E: EntityValue,
853    {
854        self.with_non_paged_slot(field, |target_slot| {
855            self.session
856                .execute_load_query_with(self.query(), move |load, plan| {
857                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
858                })
859        })
860    }
861
862    /// Execute and return distinct projected field values for the effective
863    /// result window, preserving first-observed value order.
864    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
865    where
866        E: EntityValue,
867    {
868        self.with_non_paged_slot(field, |target_slot| {
869            self.execute_prepared_projection_terminal_output(
870                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
871            )?
872            .into_values()
873            .map_err(QueryError::execute)
874        })
875    }
876
877    /// Explain `distinct_values_by(field)` routing without executing the terminal.
878    pub fn explain_distinct_values_by(
879        &self,
880        field: impl AsRef<str>,
881    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
882    where
883        E: EntityValue,
884    {
885        self.with_non_paged_slot(field, |target_slot| {
886            self.explain_prepared_projection_non_paged_terminal(
887                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
888            )
889        })
890    }
891
892    /// Execute and return projected field values paired with row ids for the
893    /// effective result window.
894    pub fn values_by_with_ids(
895        &self,
896        field: impl AsRef<str>,
897    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
898    where
899        E: EntityValue,
900    {
901        self.with_non_paged_slot(field, |target_slot| {
902            self.execute_prepared_projection_terminal_output(
903                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
904            )?
905            .into_values_with_ids()
906            .map_err(QueryError::execute)
907        })
908    }
909
910    /// Execute and return projected id/value pairs for one shared bounded
911    /// projection over the effective response window.
912    pub fn project_values_with_ids<P>(
913        &self,
914        projection: &P,
915    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
916    where
917        E: EntityValue,
918        P: ValueProjectionExpr,
919    {
920        self.with_non_paged_slot(projection.field(), |target_slot| {
921            let values = self
922                .execute_prepared_projection_terminal_output(
923                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
924                )?
925                .into_values_with_ids::<E>()
926                .map_err(QueryError::execute)?;
927
928            Self::project_terminal_items(projection, values, |projection, (id, value)| {
929                Ok((id, projection.apply_value(value)?))
930            })
931        })
932    }
933
934    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
935    pub fn explain_values_by_with_ids(
936        &self,
937        field: impl AsRef<str>,
938    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
939    where
940        E: EntityValue,
941    {
942        self.with_non_paged_slot(field, |target_slot| {
943            self.explain_prepared_projection_non_paged_terminal(
944                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
945            )
946        })
947    }
948
949    /// Execute and return the first projected field value in effective response
950    /// order, if any.
951    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
952    where
953        E: EntityValue,
954    {
955        self.with_non_paged_slot(field, |target_slot| {
956            self.execute_prepared_projection_terminal_output(
957                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
958            )?
959            .into_terminal_value()
960            .map_err(QueryError::execute)
961        })
962    }
963
964    /// Execute and return the first projected value for one shared bounded
965    /// projection in effective response order, if any.
966    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
967    where
968        E: EntityValue,
969        P: ValueProjectionExpr,
970    {
971        self.with_non_paged_slot(projection.field(), |target_slot| {
972            let value = self
973                .execute_prepared_projection_terminal_output(
974                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
975                )?
976                .into_terminal_value()
977                .map_err(QueryError::execute)?;
978
979            let mut projected =
980                Self::project_terminal_items(projection, value, |projection, value| {
981                    projection.apply_value(value)
982                })?;
983
984            Ok(projected.pop())
985        })
986    }
987
988    /// Explain `first_value_by(field)` routing without executing the terminal.
989    pub fn explain_first_value_by(
990        &self,
991        field: impl AsRef<str>,
992    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
993    where
994        E: EntityValue,
995    {
996        self.with_non_paged_slot(field, |target_slot| {
997            self.explain_prepared_projection_non_paged_terminal(
998                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
999            )
1000        })
1001    }
1002
1003    /// Execute and return the last projected field value in effective response
1004    /// order, if any.
1005    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
1006    where
1007        E: EntityValue,
1008    {
1009        self.with_non_paged_slot(field, |target_slot| {
1010            self.execute_prepared_projection_terminal_output(
1011                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1012            )?
1013            .into_terminal_value()
1014            .map_err(QueryError::execute)
1015        })
1016    }
1017
1018    /// Execute and return the last projected value for one shared bounded
1019    /// projection in effective response order, if any.
1020    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<Value>, QueryError>
1021    where
1022        E: EntityValue,
1023        P: ValueProjectionExpr,
1024    {
1025        self.with_non_paged_slot(projection.field(), |target_slot| {
1026            let value = self
1027                .execute_prepared_projection_terminal_output(
1028                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1029                )?
1030                .into_terminal_value()
1031                .map_err(QueryError::execute)?;
1032
1033            let mut projected =
1034                Self::project_terminal_items(projection, value, |projection, value| {
1035                    projection.apply_value(value)
1036                })?;
1037
1038            Ok(projected.pop())
1039        })
1040    }
1041
1042    /// Explain `last_value_by(field)` routing without executing the terminal.
1043    pub fn explain_last_value_by(
1044        &self,
1045        field: impl AsRef<str>,
1046    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1047    where
1048        E: EntityValue,
1049    {
1050        self.with_non_paged_slot(field, |target_slot| {
1051            self.explain_prepared_projection_non_paged_terminal(
1052                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1053            )
1054        })
1055    }
1056
1057    /// Execute and return the first matching identifier in response order, if any.
1058    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1059    where
1060        E: EntityValue,
1061    {
1062        self.execute_prepared_order_sensitive_terminal_output(
1063            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1064        )?
1065        .into_id()
1066        .map_err(QueryError::execute)
1067    }
1068
1069    /// Explain scalar `first()` routing without executing the terminal.
1070    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1071    where
1072        E: EntityValue,
1073    {
1074        self.explain_prepared_aggregate_non_paged_terminal(
1075            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1076        )
1077    }
1078
1079    /// Execute and return the last matching identifier in response order, if any.
1080    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1081    where
1082        E: EntityValue,
1083    {
1084        self.execute_prepared_order_sensitive_terminal_output(
1085            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1086        )?
1087        .into_id()
1088        .map_err(QueryError::execute)
1089    }
1090
1091    /// Explain scalar `last()` routing without executing the terminal.
1092    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1093    where
1094        E: EntityValue,
1095    {
1096        self.explain_prepared_aggregate_non_paged_terminal(
1097            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1098        )
1099    }
1100
1101    /// Execute and require exactly one matching row.
1102    pub fn require_one(&self) -> Result<(), QueryError>
1103    where
1104        E: EntityValue,
1105    {
1106        self.execute()?.into_rows()?.require_one()?;
1107        Ok(())
1108    }
1109
1110    /// Execute and require at least one matching row.
1111    pub fn require_some(&self) -> Result<(), QueryError>
1112    where
1113        E: EntityValue,
1114    {
1115        self.execute()?.into_rows()?.require_some()?;
1116        Ok(())
1117    }
1118}
1119
1120impl From<PreparedFluentScalarTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1121    fn from(value: PreparedFluentScalarTerminalRuntimeRequest) -> Self {
1122        match value {
1123            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
1124                Self::IdTerminal { kind }
1125            }
1126            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
1127                Self::IdBySlot { kind, target_field }
1128            }
1129        }
1130    }
1131}
1132
1133impl From<PreparedFluentExistingRowsTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1134    fn from(value: PreparedFluentExistingRowsTerminalRuntimeRequest) -> Self {
1135        match value {
1136            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => Self::Count,
1137            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => Self::Exists,
1138        }
1139    }
1140}
1141
1142impl From<PreparedFluentNumericFieldRuntimeRequest> for ScalarNumericFieldBoundaryRequest {
1143    fn from(value: PreparedFluentNumericFieldRuntimeRequest) -> Self {
1144        match value {
1145            PreparedFluentNumericFieldRuntimeRequest::Sum => Self::Sum,
1146            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => Self::SumDistinct,
1147            PreparedFluentNumericFieldRuntimeRequest::Avg => Self::Avg,
1148            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => Self::AvgDistinct,
1149        }
1150    }
1151}
1152
1153impl From<PreparedFluentOrderSensitiveTerminalRuntimeRequest> for ScalarTerminalBoundaryRequest {
1154    fn from(value: PreparedFluentOrderSensitiveTerminalRuntimeRequest) -> Self {
1155        match value {
1156            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => {
1157                Self::IdTerminal { kind }
1158            }
1159            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
1160                Self::NthBySlot { target_field, nth }
1161            }
1162            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
1163                Self::MedianBySlot { target_field }
1164            }
1165            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1166                Self::MinMaxBySlot { target_field }
1167            }
1168        }
1169    }
1170}
1171
1172impl From<PreparedFluentProjectionRuntimeRequest> for ScalarProjectionBoundaryRequest {
1173    fn from(value: PreparedFluentProjectionRuntimeRequest) -> Self {
1174        match value {
1175            PreparedFluentProjectionRuntimeRequest::Values => Self::Values,
1176            PreparedFluentProjectionRuntimeRequest::DistinctValues => Self::DistinctValues,
1177            PreparedFluentProjectionRuntimeRequest::CountDistinct => Self::CountDistinct,
1178            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => Self::ValuesWithIds,
1179            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => {
1180                Self::TerminalValue { terminal_kind }
1181            }
1182        }
1183    }
1184}