Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        query::{
10            api::ResponseCardinalityExt,
11            builder::{
12                PreparedFluentAggregateExplainStrategy, PreparedFluentExistingRowsTerminalStrategy,
13                PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalStrategy,
14                PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
15                ValueProjectionExpr,
16            },
17            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
18            fluent::load::{
19                FluentLoadQuery, FluentProjectionTerminalOutput, FluentScalarTerminalOutput,
20                LoadQueryResult,
21            },
22            intent::QueryError,
23            plan::AggregateKind,
24        },
25        response::EntityResponse,
26    },
27    error::InternalError,
28    traits::EntityValue,
29    types::{Decimal, Id},
30    value::{OutputValue, Value},
31};
32
33type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
34
35// Convert one runtime projection value into the public output boundary type.
36fn output(value: Value) -> OutputValue {
37    OutputValue::from(value)
38}
39
40// Convert one ordered runtime projection vector into the public output form.
41fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
42    values.into_iter().map(output).collect()
43}
44
45// Convert one ordered runtime `(id, value)` projection vector into the public output form.
46fn output_values_with_ids<E: PersistedRow>(
47    values: Vec<(Id<E>, Value)>,
48) -> Vec<(Id<E>, OutputValue)> {
49    values
50        .into_iter()
51        .map(|(id, value)| (id, output(value)))
52        .collect()
53}
54
55impl<E> FluentLoadQuery<'_, E>
56where
57    E: PersistedRow,
58{
59    // ------------------------------------------------------------------
60    // Execution (single semantic boundary)
61    // ------------------------------------------------------------------
62
63    /// Execute this query using the session's policy settings.
64    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
65    where
66        E: EntityValue,
67    {
68        self.ensure_non_paged_mode_ready()?;
69        self.session.execute_query_result(self.query())
70    }
71
72    // Run one read-only query/session projection through the canonical
73    // non-paged fluent policy gate so explain-style helpers do not each
74    // repeat the same readiness check and session handoff shell.
75    fn map_non_paged_query_output<T>(
76        &self,
77        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
78    ) -> Result<T, QueryError>
79    where
80        E: EntityValue,
81    {
82        self.ensure_non_paged_mode_ready()?;
83        map(self.session, self.query())
84    }
85
86    // Run one explain-visible aggregate terminal through the canonical
87    // non-paged fluent policy gate using the prepared aggregate strategy as
88    // the single explain projection source.
89    fn explain_prepared_aggregate_non_paged_terminal<S>(
90        &self,
91        strategy: &S,
92    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
93    where
94        E: EntityValue,
95        S: PreparedFluentAggregateExplainStrategy,
96    {
97        self.map_non_paged_query_output(|session, query| {
98            session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
99        })
100    }
101
102    // Run one prepared projection/distinct explain terminal through the
103    // canonical non-paged fluent policy gate using the prepared projection
104    // strategy as the single explain projection source.
105    fn explain_prepared_projection_non_paged_terminal(
106        &self,
107        strategy: &PreparedFluentProjectionStrategy,
108    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
109    where
110        E: EntityValue,
111    {
112        self.map_non_paged_query_output(|session, query| {
113            session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
114        })
115    }
116
117    // Resolve the structural execution descriptor for this fluent load query
118    // through the session-owned visible-index explain path once.
119    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
120    where
121        E: EntityValue,
122    {
123        self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
124    }
125
126    // Render one descriptor-derived execution surface so text/json explain
127    // terminals do not each forward the same session explain call ad hoc.
128    fn render_execution_descriptor(
129        &self,
130        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
131    ) -> Result<String, QueryError>
132    where
133        E: EntityValue,
134    {
135        let descriptor = self.explain_execution_descriptor()?;
136
137        Ok(render(descriptor))
138    }
139
140    // Execute one prepared existing-rows terminal and decode the typed output
141    // through one shared mismatch/error-mapping lane.
142    fn map_prepared_existing_rows_terminal_output<T>(
143        &self,
144        strategy: PreparedFluentExistingRowsTerminalStrategy,
145        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
146    ) -> Result<T, QueryError>
147    where
148        E: EntityValue,
149    {
150        self.ensure_non_paged_mode_ready()?;
151        let output = self
152            .session
153            .execute_fluent_existing_rows_terminal(self.query(), strategy)?;
154
155        map(output).map_err(QueryError::execute)
156    }
157
158    // Execute one prepared fluent numeric-field terminal through the canonical
159    // non-paged fluent policy gate using the prepared numeric strategy as the
160    // single runtime source.
161    fn execute_prepared_numeric_field_terminal(
162        &self,
163        strategy: PreparedFluentNumericFieldStrategy,
164    ) -> Result<Option<Decimal>, QueryError>
165    where
166        E: EntityValue,
167    {
168        self.ensure_non_paged_mode_ready()?;
169
170        self.session
171            .execute_fluent_numeric_field_terminal(self.query(), strategy)
172    }
173
174    // Execute one prepared order-sensitive terminal and decode the typed
175    // output through one shared mismatch/error-mapping lane.
176    fn map_prepared_order_sensitive_terminal_output<T>(
177        &self,
178        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
179        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
180    ) -> Result<T, QueryError>
181    where
182        E: EntityValue,
183    {
184        self.ensure_non_paged_mode_ready()?;
185        let output = self
186            .session
187            .execute_fluent_order_sensitive_terminal(self.query(), strategy)?;
188
189        map(output).map_err(QueryError::execute)
190    }
191
192    // Execute one prepared fluent projection/distinct terminal through the
193    // canonical non-paged fluent policy gate using the prepared projection
194    // strategy as the single runtime source.
195    fn execute_prepared_projection_terminal_output(
196        &self,
197        strategy: PreparedFluentProjectionStrategy,
198    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
199    where
200        E: EntityValue,
201    {
202        self.ensure_non_paged_mode_ready()?;
203
204        self.session
205            .execute_fluent_projection_terminal(self.query(), strategy)
206    }
207
208    // Execute one prepared projection terminal and decode the typed output
209    // through one shared mismatch/error-mapping lane.
210    fn map_prepared_projection_terminal_output<T>(
211        &self,
212        strategy: PreparedFluentProjectionStrategy,
213        map: impl FnOnce(FluentProjectionTerminalOutput<E>) -> Result<T, InternalError>,
214    ) -> Result<T, QueryError>
215    where
216        E: EntityValue,
217    {
218        let output = self.execute_prepared_projection_terminal_output(strategy)?;
219
220        map(output).map_err(QueryError::execute)
221    }
222
223    // Execute one prepared scalar terminal and decode the typed output through
224    // one shared mismatch/error-mapping lane.
225    fn map_prepared_scalar_terminal_output<T>(
226        &self,
227        strategy: PreparedFluentScalarTerminalStrategy,
228        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
229    ) -> Result<T, QueryError>
230    where
231        E: EntityValue,
232    {
233        self.ensure_non_paged_mode_ready()?;
234        let output = self
235            .session
236            .execute_fluent_scalar_terminal(self.query(), strategy)?;
237
238        map(output).map_err(QueryError::execute)
239    }
240
241    // Apply one shared bounded value projection to iterator-like terminal
242    // output while preserving source order and cardinality.
243    fn project_terminal_items<P, T, U>(
244        projection: &P,
245        values: impl IntoIterator<Item = T>,
246        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
247    ) -> Result<Vec<U>, QueryError>
248    where
249        P: ValueProjectionExpr,
250    {
251        values
252            .into_iter()
253            .map(|value| map(projection, value))
254            .collect()
255    }
256
257    // ------------------------------------------------------------------
258    // Execution terminals — semantic only
259    // ------------------------------------------------------------------
260
261    /// Execute and return whether the result set is empty.
262    pub fn is_empty(&self) -> Result<bool, QueryError>
263    where
264        E: EntityValue,
265    {
266        self.not_exists()
267    }
268
269    /// Execute and return whether no matching row exists.
270    pub fn not_exists(&self) -> Result<bool, QueryError>
271    where
272        E: EntityValue,
273    {
274        Ok(!self.exists()?)
275    }
276
277    /// Execute and return whether at least one matching row exists.
278    pub fn exists(&self) -> Result<bool, QueryError>
279    where
280        E: EntityValue,
281    {
282        self.map_prepared_existing_rows_terminal_output(
283            PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
284            FluentScalarTerminalOutput::into_exists,
285        )
286    }
287
288    /// Explain scalar `exists()` routing without executing the terminal.
289    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
290    where
291        E: EntityValue,
292    {
293        self.explain_prepared_aggregate_non_paged_terminal(
294            &PreparedFluentExistingRowsTerminalStrategy::exists_rows(),
295        )
296    }
297
298    /// Explain scalar `not_exists()` routing without executing the terminal.
299    ///
300    /// This remains an `exists()` execution plan with negated boolean semantics.
301    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
302    where
303        E: EntityValue,
304    {
305        self.explain_exists()
306    }
307
308    /// Explain scalar load execution shape without executing the query.
309    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
310    where
311        E: EntityValue,
312    {
313        self.explain_execution_descriptor()
314    }
315
316    /// Explain scalar load execution shape as deterministic text.
317    pub fn explain_execution_text(&self) -> Result<String, QueryError>
318    where
319        E: EntityValue,
320    {
321        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
322    }
323
324    /// Explain scalar load execution shape as canonical JSON.
325    pub fn explain_execution_json(&self) -> Result<String, QueryError>
326    where
327        E: EntityValue,
328    {
329        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
330    }
331
332    /// Explain scalar load execution shape as verbose text with diagnostics.
333    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
334    where
335        E: EntityValue,
336    {
337        self.map_non_paged_query_output(
338            DbSession::explain_query_execution_verbose_with_visible_indexes,
339        )
340    }
341
342    /// Execute and return the number of matching rows.
343    pub fn count(&self) -> Result<u32, QueryError>
344    where
345        E: EntityValue,
346    {
347        self.map_prepared_existing_rows_terminal_output(
348            PreparedFluentExistingRowsTerminalStrategy::count_rows(),
349            FluentScalarTerminalOutput::into_count,
350        )
351    }
352
353    /// Execute and return the total persisted payload bytes for the effective
354    /// result window.
355    pub fn bytes(&self) -> Result<u64, QueryError>
356    where
357        E: EntityValue,
358    {
359        self.ensure_non_paged_mode_ready()?;
360
361        self.session.execute_fluent_bytes(self.query())
362    }
363
364    /// Execute and return the total serialized bytes for `field` over the
365    /// effective result window.
366    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
367    where
368        E: EntityValue,
369    {
370        self.with_non_paged_slot(field, |target_slot| {
371            self.session
372                .execute_fluent_bytes_by_slot(self.query(), target_slot)
373        })
374    }
375
376    /// Explain `bytes_by(field)` routing without executing the terminal.
377    pub fn explain_bytes_by(
378        &self,
379        field: impl AsRef<str>,
380    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
381    where
382        E: EntityValue,
383    {
384        self.with_non_paged_slot(field, |target_slot| {
385            self.session
386                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
387        })
388    }
389
390    /// Execute and return the smallest matching identifier, if any.
391    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
392    where
393        E: EntityValue,
394    {
395        self.map_prepared_scalar_terminal_output(
396            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
397            FluentScalarTerminalOutput::into_id,
398        )
399    }
400
401    /// Explain scalar `min()` routing without executing the terminal.
402    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
403    where
404        E: EntityValue,
405    {
406        self.explain_prepared_aggregate_non_paged_terminal(
407            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Min),
408        )
409    }
410
411    /// Execute and return the id of the row with the smallest value for `field`.
412    ///
413    /// Ties are deterministic: equal field values resolve by primary key ascending.
414    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
415    where
416        E: EntityValue,
417    {
418        self.with_non_paged_slot(field, |target_slot| {
419            self.map_prepared_scalar_terminal_output(
420                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
421                FluentScalarTerminalOutput::into_id,
422            )
423        })
424    }
425
426    /// Execute and return the largest matching identifier, if any.
427    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
428    where
429        E: EntityValue,
430    {
431        self.map_prepared_scalar_terminal_output(
432            PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
433            FluentScalarTerminalOutput::into_id,
434        )
435    }
436
437    /// Explain scalar `max()` routing without executing the terminal.
438    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
439    where
440        E: EntityValue,
441    {
442        self.explain_prepared_aggregate_non_paged_terminal(
443            &PreparedFluentScalarTerminalStrategy::id_terminal(AggregateKind::Max),
444        )
445    }
446
447    /// Execute and return the id of the row with the largest value for `field`.
448    ///
449    /// Ties are deterministic: equal field values resolve by primary key ascending.
450    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
451    where
452        E: EntityValue,
453    {
454        self.with_non_paged_slot(field, |target_slot| {
455            self.map_prepared_scalar_terminal_output(
456                PreparedFluentScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
457                FluentScalarTerminalOutput::into_id,
458            )
459        })
460    }
461
462    /// Execute and return the id at zero-based ordinal `nth` when rows are
463    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
464    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
465    where
466        E: EntityValue,
467    {
468        self.with_non_paged_slot(field, |target_slot| {
469            self.map_prepared_order_sensitive_terminal_output(
470                PreparedFluentOrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
471                FluentScalarTerminalOutput::into_id,
472            )
473        })
474    }
475
476    /// Execute and return the sum of `field` over matching rows.
477    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
478    where
479        E: EntityValue,
480    {
481        self.with_non_paged_slot(field, |target_slot| {
482            self.execute_prepared_numeric_field_terminal(
483                PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
484            )
485        })
486    }
487
488    /// Explain scalar `sum_by(field)` routing without executing the terminal.
489    pub fn explain_sum_by(
490        &self,
491        field: impl AsRef<str>,
492    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
493    where
494        E: EntityValue,
495    {
496        self.with_non_paged_slot(field, |target_slot| {
497            self.explain_prepared_aggregate_non_paged_terminal(
498                &PreparedFluentNumericFieldStrategy::sum_by_slot(target_slot),
499            )
500        })
501    }
502
503    /// Execute and return the sum of distinct `field` values.
504    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
505    where
506        E: EntityValue,
507    {
508        self.with_non_paged_slot(field, |target_slot| {
509            self.execute_prepared_numeric_field_terminal(
510                PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
511            )
512        })
513    }
514
515    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
516    pub fn explain_sum_distinct_by(
517        &self,
518        field: impl AsRef<str>,
519    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
520    where
521        E: EntityValue,
522    {
523        self.with_non_paged_slot(field, |target_slot| {
524            self.explain_prepared_aggregate_non_paged_terminal(
525                &PreparedFluentNumericFieldStrategy::sum_distinct_by_slot(target_slot),
526            )
527        })
528    }
529
530    /// Execute and return the average of `field` over matching rows.
531    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
532    where
533        E: EntityValue,
534    {
535        self.with_non_paged_slot(field, |target_slot| {
536            self.execute_prepared_numeric_field_terminal(
537                PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
538            )
539        })
540    }
541
542    /// Explain scalar `avg_by(field)` routing without executing the terminal.
543    pub fn explain_avg_by(
544        &self,
545        field: impl AsRef<str>,
546    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
547    where
548        E: EntityValue,
549    {
550        self.with_non_paged_slot(field, |target_slot| {
551            self.explain_prepared_aggregate_non_paged_terminal(
552                &PreparedFluentNumericFieldStrategy::avg_by_slot(target_slot),
553            )
554        })
555    }
556
557    /// Execute and return the average of distinct `field` values.
558    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
559    where
560        E: EntityValue,
561    {
562        self.with_non_paged_slot(field, |target_slot| {
563            self.execute_prepared_numeric_field_terminal(
564                PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
565            )
566        })
567    }
568
569    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
570    pub fn explain_avg_distinct_by(
571        &self,
572        field: impl AsRef<str>,
573    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
574    where
575        E: EntityValue,
576    {
577        self.with_non_paged_slot(field, |target_slot| {
578            self.explain_prepared_aggregate_non_paged_terminal(
579                &PreparedFluentNumericFieldStrategy::avg_distinct_by_slot(target_slot),
580            )
581        })
582    }
583
584    /// Execute and return the median id by `field` using deterministic ordering
585    /// `(field asc, primary key asc)`.
586    ///
587    /// Even-length windows select the lower median.
588    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
589    where
590        E: EntityValue,
591    {
592        self.with_non_paged_slot(field, |target_slot| {
593            self.map_prepared_order_sensitive_terminal_output(
594                PreparedFluentOrderSensitiveTerminalStrategy::median_by_slot(target_slot),
595                FluentScalarTerminalOutput::into_id,
596            )
597        })
598    }
599
600    /// Execute and return the number of distinct values for `field` over the
601    /// effective result window.
602    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
603    where
604        E: EntityValue,
605    {
606        self.with_non_paged_slot(field, |target_slot| {
607            self.map_prepared_projection_terminal_output(
608                PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
609                FluentProjectionTerminalOutput::into_count,
610            )
611        })
612    }
613
614    /// Explain `count_distinct_by(field)` routing without executing the terminal.
615    pub fn explain_count_distinct_by(
616        &self,
617        field: impl AsRef<str>,
618    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
619    where
620        E: EntityValue,
621    {
622        self.with_non_paged_slot(field, |target_slot| {
623            self.explain_prepared_projection_non_paged_terminal(
624                &PreparedFluentProjectionStrategy::count_distinct_by_slot(target_slot),
625            )
626        })
627    }
628
629    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
630    ///
631    /// Tie handling is deterministic for both extrema: primary key ascending.
632    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
633    where
634        E: EntityValue,
635    {
636        self.with_non_paged_slot(field, |target_slot| {
637            self.map_prepared_order_sensitive_terminal_output(
638                PreparedFluentOrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
639                FluentScalarTerminalOutput::into_id_pair,
640            )
641        })
642    }
643
644    /// Execute and return projected field values for the effective result window.
645    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
646    where
647        E: EntityValue,
648    {
649        self.with_non_paged_slot(field, |target_slot| {
650            self.map_prepared_projection_terminal_output(
651                PreparedFluentProjectionStrategy::values_by_slot(target_slot),
652                FluentProjectionTerminalOutput::into_values,
653            )
654            .map(output_values)
655        })
656    }
657
658    /// Execute and return projected values for one shared bounded projection
659    /// over the effective response window.
660    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
661    where
662        E: EntityValue,
663        P: ValueProjectionExpr,
664    {
665        self.with_non_paged_slot(projection.field(), |target_slot| {
666            let values = self
667                .execute_prepared_projection_terminal_output(
668                    PreparedFluentProjectionStrategy::values_by_slot(target_slot),
669                )?
670                .into_values()
671                .map_err(QueryError::execute)?;
672
673            Self::project_terminal_items(projection, values, |projection, value| {
674                projection.apply_value(value)
675            })
676            .map(output_values)
677        })
678    }
679
680    /// Explain `project_values(projection)` routing without executing it.
681    pub fn explain_project_values<P>(
682        &self,
683        projection: &P,
684    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
685    where
686        E: EntityValue,
687        P: ValueProjectionExpr,
688    {
689        self.with_non_paged_slot(projection.field(), |target_slot| {
690            self.explain_prepared_projection_non_paged_terminal(
691                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
692            )
693        })
694    }
695
696    /// Explain `values_by(field)` routing without executing the terminal.
697    pub fn explain_values_by(
698        &self,
699        field: impl AsRef<str>,
700    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
701    where
702        E: EntityValue,
703    {
704        self.with_non_paged_slot(field, |target_slot| {
705            self.explain_prepared_projection_non_paged_terminal(
706                &PreparedFluentProjectionStrategy::values_by_slot(target_slot),
707            )
708        })
709    }
710
711    /// Execute and return the first `k` rows from the effective response window.
712    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
713    where
714        E: EntityValue,
715    {
716        self.ensure_non_paged_mode_ready()?;
717
718        self.session.execute_fluent_take(self.query(), take_count)
719    }
720
721    /// Execute and return the top `k` rows by `field` under deterministic
722    /// ordering `(field desc, primary_key asc)` over the effective response
723    /// window.
724    ///
725    /// This terminal applies its own ordering and does not preserve query
726    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
727    /// matches `max_by(field)` selection semantics.
728    pub fn top_k_by(
729        &self,
730        field: impl AsRef<str>,
731        take_count: u32,
732    ) -> Result<EntityResponse<E>, QueryError>
733    where
734        E: EntityValue,
735    {
736        self.with_non_paged_slot(field, |target_slot| {
737            self.session.execute_fluent_ranked_rows_by_slot(
738                self.query(),
739                target_slot,
740                take_count,
741                true,
742            )
743        })
744    }
745
746    /// Execute and return the bottom `k` rows by `field` under deterministic
747    /// ordering `(field asc, primary_key asc)` over the effective response
748    /// window.
749    ///
750    /// This terminal applies its own ordering and does not preserve query
751    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
752    /// matches `min_by(field)` selection semantics.
753    pub fn bottom_k_by(
754        &self,
755        field: impl AsRef<str>,
756        take_count: u32,
757    ) -> Result<EntityResponse<E>, QueryError>
758    where
759        E: EntityValue,
760    {
761        self.with_non_paged_slot(field, |target_slot| {
762            self.session.execute_fluent_ranked_rows_by_slot(
763                self.query(),
764                target_slot,
765                take_count,
766                false,
767            )
768        })
769    }
770
771    /// Execute and return projected values for the top `k` rows by `field`
772    /// under deterministic ordering `(field desc, primary_key asc)` over the
773    /// effective response window.
774    ///
775    /// Ranking is applied before projection and does not preserve query
776    /// `order_term(...)` row order in the returned values. For `k = 1`, this
777    /// matches `max_by(field)` projected to one value.
778    pub fn top_k_by_values(
779        &self,
780        field: impl AsRef<str>,
781        take_count: u32,
782    ) -> Result<Vec<OutputValue>, QueryError>
783    where
784        E: EntityValue,
785    {
786        self.with_non_paged_slot(field, |target_slot| {
787            self.session
788                .execute_fluent_ranked_values_by_slot(self.query(), target_slot, take_count, true)
789                .map(output_values)
790        })
791    }
792
793    /// Execute and return projected values for the bottom `k` rows by `field`
794    /// under deterministic ordering `(field asc, primary_key asc)` over the
795    /// effective response window.
796    ///
797    /// Ranking is applied before projection and does not preserve query
798    /// `order_term(...)` row order in the returned values. For `k = 1`, this
799    /// matches `min_by(field)` projected to one value.
800    pub fn bottom_k_by_values(
801        &self,
802        field: impl AsRef<str>,
803        take_count: u32,
804    ) -> Result<Vec<OutputValue>, QueryError>
805    where
806        E: EntityValue,
807    {
808        self.with_non_paged_slot(field, |target_slot| {
809            self.session
810                .execute_fluent_ranked_values_by_slot(self.query(), target_slot, take_count, false)
811                .map(output_values)
812        })
813    }
814
815    /// Execute and return projected id/value pairs for the top `k` rows by
816    /// `field` under deterministic ordering `(field desc, primary_key asc)`
817    /// over the effective response window.
818    ///
819    /// Ranking is applied before projection and does not preserve query
820    /// `order_term(...)` row order in the returned values. For `k = 1`, this
821    /// matches `max_by(field)` projected to one `(id, value)` pair.
822    pub fn top_k_by_with_ids(
823        &self,
824        field: impl AsRef<str>,
825        take_count: u32,
826    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
827    where
828        E: EntityValue,
829    {
830        self.with_non_paged_slot(field, |target_slot| {
831            self.session
832                .execute_fluent_ranked_values_with_ids_by_slot(
833                    self.query(),
834                    target_slot,
835                    take_count,
836                    true,
837                )
838                .map(output_values_with_ids)
839        })
840    }
841
842    /// Execute and return projected id/value pairs for the bottom `k` rows by
843    /// `field` under deterministic ordering `(field asc, primary_key asc)`
844    /// over the effective response window.
845    ///
846    /// Ranking is applied before projection and does not preserve query
847    /// `order_term(...)` row order in the returned values. For `k = 1`, this
848    /// matches `min_by(field)` projected to one `(id, value)` pair.
849    pub fn bottom_k_by_with_ids(
850        &self,
851        field: impl AsRef<str>,
852        take_count: u32,
853    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
854    where
855        E: EntityValue,
856    {
857        self.with_non_paged_slot(field, |target_slot| {
858            self.session
859                .execute_fluent_ranked_values_with_ids_by_slot(
860                    self.query(),
861                    target_slot,
862                    take_count,
863                    false,
864                )
865                .map(output_values_with_ids)
866        })
867    }
868
869    /// Execute and return distinct projected field values for the effective
870    /// result window, preserving first-observed value order.
871    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
872    where
873        E: EntityValue,
874    {
875        self.with_non_paged_slot(field, |target_slot| {
876            self.map_prepared_projection_terminal_output(
877                PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
878                FluentProjectionTerminalOutput::into_values,
879            )
880            .map(output_values)
881        })
882    }
883
884    /// Explain `distinct_values_by(field)` routing without executing the terminal.
885    pub fn explain_distinct_values_by(
886        &self,
887        field: impl AsRef<str>,
888    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
889    where
890        E: EntityValue,
891    {
892        self.with_non_paged_slot(field, |target_slot| {
893            self.explain_prepared_projection_non_paged_terminal(
894                &PreparedFluentProjectionStrategy::distinct_values_by_slot(target_slot),
895            )
896        })
897    }
898
899    /// Execute and return projected field values paired with row ids for the
900    /// effective result window.
901    pub fn values_by_with_ids(
902        &self,
903        field: impl AsRef<str>,
904    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
905    where
906        E: EntityValue,
907    {
908        self.with_non_paged_slot(field, |target_slot| {
909            self.map_prepared_projection_terminal_output(
910                PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
911                FluentProjectionTerminalOutput::into_values_with_ids,
912            )
913            .map(output_values_with_ids)
914        })
915    }
916
917    /// Execute and return projected id/value pairs for one shared bounded
918    /// projection over the effective response window.
919    pub fn project_values_with_ids<P>(
920        &self,
921        projection: &P,
922    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
923    where
924        E: EntityValue,
925        P: ValueProjectionExpr,
926    {
927        self.with_non_paged_slot(projection.field(), |target_slot| {
928            let values = self
929                .execute_prepared_projection_terminal_output(
930                    PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
931                )?
932                .into_values_with_ids()
933                .map_err(QueryError::execute)?;
934
935            Self::project_terminal_items(projection, values, |projection, (id, value)| {
936                Ok((id, projection.apply_value(value)?))
937            })
938            .map(output_values_with_ids)
939        })
940    }
941
942    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
943    pub fn explain_values_by_with_ids(
944        &self,
945        field: impl AsRef<str>,
946    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
947    where
948        E: EntityValue,
949    {
950        self.with_non_paged_slot(field, |target_slot| {
951            self.explain_prepared_projection_non_paged_terminal(
952                &PreparedFluentProjectionStrategy::values_by_with_ids_slot(target_slot),
953            )
954        })
955    }
956
957    /// Execute and return the first projected field value in effective response
958    /// order, if any.
959    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
960    where
961        E: EntityValue,
962    {
963        self.with_non_paged_slot(field, |target_slot| {
964            self.map_prepared_projection_terminal_output(
965                PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
966                FluentProjectionTerminalOutput::into_terminal_value,
967            )
968            .map(|value| value.map(output))
969        })
970    }
971
972    /// Execute and return the first projected value for one shared bounded
973    /// projection in effective response order, if any.
974    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
975    where
976        E: EntityValue,
977        P: ValueProjectionExpr,
978    {
979        self.with_non_paged_slot(projection.field(), |target_slot| {
980            let value = self
981                .execute_prepared_projection_terminal_output(
982                    PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
983                )?
984                .into_terminal_value()
985                .map_err(QueryError::execute)?;
986
987            let mut projected =
988                Self::project_terminal_items(projection, value, |projection, value| {
989                    projection.apply_value(value)
990                })?;
991
992            Ok(projected.pop().map(output))
993        })
994    }
995
996    /// Explain `first_value_by(field)` routing without executing the terminal.
997    pub fn explain_first_value_by(
998        &self,
999        field: impl AsRef<str>,
1000    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1001    where
1002        E: EntityValue,
1003    {
1004        self.with_non_paged_slot(field, |target_slot| {
1005            self.explain_prepared_projection_non_paged_terminal(
1006                &PreparedFluentProjectionStrategy::first_value_by_slot(target_slot),
1007            )
1008        })
1009    }
1010
1011    /// Execute and return the last projected field value in effective response
1012    /// order, if any.
1013    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1014    where
1015        E: EntityValue,
1016    {
1017        self.with_non_paged_slot(field, |target_slot| {
1018            self.map_prepared_projection_terminal_output(
1019                PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1020                FluentProjectionTerminalOutput::into_terminal_value,
1021            )
1022            .map(|value| value.map(output))
1023        })
1024    }
1025
1026    /// Execute and return the last projected value for one shared bounded
1027    /// projection in effective response order, if any.
1028    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1029    where
1030        E: EntityValue,
1031        P: ValueProjectionExpr,
1032    {
1033        self.with_non_paged_slot(projection.field(), |target_slot| {
1034            let value = self
1035                .execute_prepared_projection_terminal_output(
1036                    PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1037                )?
1038                .into_terminal_value()
1039                .map_err(QueryError::execute)?;
1040
1041            let mut projected =
1042                Self::project_terminal_items(projection, value, |projection, value| {
1043                    projection.apply_value(value)
1044                })?;
1045
1046            Ok(projected.pop().map(output))
1047        })
1048    }
1049
1050    /// Explain `last_value_by(field)` routing without executing the terminal.
1051    pub fn explain_last_value_by(
1052        &self,
1053        field: impl AsRef<str>,
1054    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1055    where
1056        E: EntityValue,
1057    {
1058        self.with_non_paged_slot(field, |target_slot| {
1059            self.explain_prepared_projection_non_paged_terminal(
1060                &PreparedFluentProjectionStrategy::last_value_by_slot(target_slot),
1061            )
1062        })
1063    }
1064
1065    /// Execute and return the first matching identifier in response order, if any.
1066    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1067    where
1068        E: EntityValue,
1069    {
1070        self.map_prepared_order_sensitive_terminal_output(
1071            PreparedFluentOrderSensitiveTerminalStrategy::first(),
1072            FluentScalarTerminalOutput::into_id,
1073        )
1074    }
1075
1076    /// Explain scalar `first()` routing without executing the terminal.
1077    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1078    where
1079        E: EntityValue,
1080    {
1081        self.explain_prepared_aggregate_non_paged_terminal(
1082            &PreparedFluentOrderSensitiveTerminalStrategy::first(),
1083        )
1084    }
1085
1086    /// Execute and return the last matching identifier in response order, if any.
1087    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1088    where
1089        E: EntityValue,
1090    {
1091        self.map_prepared_order_sensitive_terminal_output(
1092            PreparedFluentOrderSensitiveTerminalStrategy::last(),
1093            FluentScalarTerminalOutput::into_id,
1094        )
1095    }
1096
1097    /// Explain scalar `last()` routing without executing the terminal.
1098    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1099    where
1100        E: EntityValue,
1101    {
1102        self.explain_prepared_aggregate_non_paged_terminal(
1103            &PreparedFluentOrderSensitiveTerminalStrategy::last(),
1104        )
1105    }
1106
1107    /// Execute and require exactly one matching row.
1108    pub fn require_one(&self) -> Result<(), QueryError>
1109    where
1110        E: EntityValue,
1111    {
1112        self.execute()?.into_rows()?.require_one()?;
1113        Ok(())
1114    }
1115
1116    /// Execute and require at least one matching row.
1117    pub fn require_some(&self) -> Result<(), QueryError>
1118    where
1119        E: EntityValue,
1120    {
1121        self.execute()?.into_rows()?.require_some()?;
1122        Ok(())
1123    }
1124}