Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession, PersistedRow, Query,
9        query::{
10            api::ResponseCardinalityExt,
11            builder::{
12                AggregateExplain, ExistingRowsTerminalStrategy, NumericFieldStrategy,
13                OrderSensitiveTerminalStrategy, ProjectionStrategy, ScalarTerminalStrategy,
14                ValueProjectionExpr,
15            },
16            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
17            fluent::load::{
18                FluentLoadQuery, FluentProjectionTerminalOutput, FluentScalarTerminalOutput,
19                LoadQueryResult,
20            },
21            intent::QueryError,
22            plan::AggregateKind,
23        },
24        response::EntityResponse,
25    },
26    error::InternalError,
27    traits::EntityValue,
28    types::{Decimal, Id},
29    value::{OutputValue, Value},
30};
31
32type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
33
34// Convert one runtime projection value into the public output boundary type.
35fn output(value: Value) -> OutputValue {
36    OutputValue::from(value)
37}
38
39// Convert one ordered runtime projection vector into the public output form.
40fn output_values(values: Vec<Value>) -> Vec<OutputValue> {
41    values.into_iter().map(output).collect()
42}
43
44// Convert one ordered runtime `(id, value)` projection vector into the public output form.
45fn output_values_with_ids<E: PersistedRow>(
46    values: Vec<(Id<E>, Value)>,
47) -> Vec<(Id<E>, OutputValue)> {
48    values
49        .into_iter()
50        .map(|(id, value)| (id, output(value)))
51        .collect()
52}
53
54impl<E> FluentLoadQuery<'_, E>
55where
56    E: PersistedRow,
57{
58    // ------------------------------------------------------------------
59    // Execution (single semantic boundary)
60    // ------------------------------------------------------------------
61
62    /// Execute this query using the session's policy settings.
63    pub fn execute(&self) -> Result<LoadQueryResult<E>, QueryError>
64    where
65        E: EntityValue,
66    {
67        self.ensure_non_paged_mode_ready()?;
68        self.session.execute_query_result(self.query())
69    }
70
71    // Run one read-only query/session projection through the canonical
72    // non-paged fluent policy gate so explain-style helpers do not each
73    // repeat the same readiness check and session handoff shell.
74    fn map_non_paged_query_output<T>(
75        &self,
76        map: impl FnOnce(&DbSession<E::Canister>, &Query<E>) -> Result<T, QueryError>,
77    ) -> Result<T, QueryError>
78    where
79        E: EntityValue,
80    {
81        self.ensure_non_paged_mode_ready()?;
82        map(self.session, self.query())
83    }
84
85    // Run one explain-visible aggregate terminal through the canonical
86    // non-paged fluent policy gate using the prepared aggregate strategy as
87    // the single explain projection source.
88    fn explain_prepared_aggregate_non_paged_terminal<S>(
89        &self,
90        strategy: &S,
91    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
92    where
93        E: EntityValue,
94        S: AggregateExplain,
95    {
96        self.map_non_paged_query_output(|session, query| {
97            session.explain_query_prepared_aggregate_terminal_with_visible_indexes(query, strategy)
98        })
99    }
100
101    // Run one prepared projection/distinct explain terminal through the
102    // canonical non-paged fluent policy gate using the prepared projection
103    // strategy as the single explain projection source.
104    fn explain_prepared_projection_non_paged_terminal(
105        &self,
106        strategy: &ProjectionStrategy,
107    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
108    where
109        E: EntityValue,
110    {
111        self.map_non_paged_query_output(|session, query| {
112            session.explain_query_prepared_projection_terminal_with_visible_indexes(query, strategy)
113        })
114    }
115
116    // Resolve the structural execution descriptor for this fluent load query
117    // through the session-owned visible-index explain path once.
118    fn explain_execution_descriptor(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
119    where
120        E: EntityValue,
121    {
122        self.map_non_paged_query_output(DbSession::explain_query_execution_with_visible_indexes)
123    }
124
125    // Render one descriptor-derived execution surface so text/json explain
126    // terminals do not each forward the same session explain call ad hoc.
127    fn render_execution_descriptor(
128        &self,
129        render: impl FnOnce(ExplainExecutionNodeDescriptor) -> String,
130    ) -> Result<String, QueryError>
131    where
132        E: EntityValue,
133    {
134        let descriptor = self.explain_execution_descriptor()?;
135
136        Ok(render(descriptor))
137    }
138
139    // Execute one prepared existing-rows terminal and decode the typed output
140    // through one shared mismatch/error-mapping lane.
141    fn map_prepared_existing_rows_terminal_output<T>(
142        &self,
143        strategy: ExistingRowsTerminalStrategy,
144        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
145    ) -> Result<T, QueryError>
146    where
147        E: EntityValue,
148    {
149        self.ensure_non_paged_mode_ready()?;
150        let output = self
151            .session
152            .execute_fluent_existing_rows_terminal(self.query(), strategy)?;
153
154        map(output).map_err(QueryError::execute)
155    }
156
157    // Execute one prepared fluent numeric-field terminal through the canonical
158    // non-paged fluent policy gate using the prepared numeric strategy as the
159    // single runtime source.
160    fn execute_prepared_numeric_field_terminal(
161        &self,
162        strategy: NumericFieldStrategy,
163    ) -> Result<Option<Decimal>, QueryError>
164    where
165        E: EntityValue,
166    {
167        self.ensure_non_paged_mode_ready()?;
168
169        self.session
170            .execute_fluent_numeric_field_terminal(self.query(), strategy)
171    }
172
173    // Execute one prepared order-sensitive terminal and decode the typed
174    // output through one shared mismatch/error-mapping lane.
175    fn map_prepared_order_sensitive_terminal_output<T>(
176        &self,
177        strategy: OrderSensitiveTerminalStrategy,
178        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
179    ) -> Result<T, QueryError>
180    where
181        E: EntityValue,
182    {
183        self.ensure_non_paged_mode_ready()?;
184        let output = self
185            .session
186            .execute_fluent_order_sensitive_terminal(self.query(), strategy)?;
187
188        map(output).map_err(QueryError::execute)
189    }
190
191    // Execute one prepared fluent projection/distinct terminal through the
192    // canonical non-paged fluent policy gate using the prepared projection
193    // strategy as the single runtime source.
194    fn execute_prepared_projection_terminal_output(
195        &self,
196        strategy: ProjectionStrategy,
197    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
198    where
199        E: EntityValue,
200    {
201        self.ensure_non_paged_mode_ready()?;
202
203        self.session
204            .execute_fluent_projection_terminal(self.query(), strategy)
205    }
206
207    // Execute one prepared projection terminal and decode the typed output
208    // through one shared mismatch/error-mapping lane.
209    fn map_prepared_projection_terminal_output<T>(
210        &self,
211        strategy: ProjectionStrategy,
212        map: impl FnOnce(FluentProjectionTerminalOutput<E>) -> Result<T, InternalError>,
213    ) -> Result<T, QueryError>
214    where
215        E: EntityValue,
216    {
217        let output = self.execute_prepared_projection_terminal_output(strategy)?;
218
219        map(output).map_err(QueryError::execute)
220    }
221
222    // Execute one prepared scalar terminal and decode the typed output through
223    // one shared mismatch/error-mapping lane.
224    fn map_prepared_scalar_terminal_output<T>(
225        &self,
226        strategy: ScalarTerminalStrategy,
227        map: impl FnOnce(FluentScalarTerminalOutput<E>) -> Result<T, InternalError>,
228    ) -> Result<T, QueryError>
229    where
230        E: EntityValue,
231    {
232        self.ensure_non_paged_mode_ready()?;
233        let output = self
234            .session
235            .execute_fluent_scalar_terminal(self.query(), strategy)?;
236
237        map(output).map_err(QueryError::execute)
238    }
239
240    // Apply one shared bounded value projection to iterator-like terminal
241    // output while preserving source order and cardinality.
242    fn project_terminal_items<P, T, U>(
243        projection: &P,
244        values: impl IntoIterator<Item = T>,
245        mut map: impl FnMut(&P, T) -> Result<U, QueryError>,
246    ) -> Result<Vec<U>, QueryError>
247    where
248        P: ValueProjectionExpr,
249    {
250        values
251            .into_iter()
252            .map(|value| map(projection, value))
253            .collect()
254    }
255
256    // ------------------------------------------------------------------
257    // Execution terminals — semantic only
258    // ------------------------------------------------------------------
259
260    /// Execute and return whether the result set is empty.
261    pub fn is_empty(&self) -> Result<bool, QueryError>
262    where
263        E: EntityValue,
264    {
265        self.not_exists()
266    }
267
268    /// Execute and return whether no matching row exists.
269    pub fn not_exists(&self) -> Result<bool, QueryError>
270    where
271        E: EntityValue,
272    {
273        Ok(!self.exists()?)
274    }
275
276    /// Execute and return whether at least one matching row exists.
277    pub fn exists(&self) -> Result<bool, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.map_prepared_existing_rows_terminal_output(
282            ExistingRowsTerminalStrategy::exists_rows(),
283            FluentScalarTerminalOutput::into_exists,
284        )
285    }
286
287    /// Explain scalar `exists()` routing without executing the terminal.
288    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
289    where
290        E: EntityValue,
291    {
292        self.explain_prepared_aggregate_non_paged_terminal(
293            &ExistingRowsTerminalStrategy::exists_rows(),
294        )
295    }
296
297    /// Explain scalar `not_exists()` routing without executing the terminal.
298    ///
299    /// This remains an `exists()` execution plan with negated boolean semantics.
300    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
301    where
302        E: EntityValue,
303    {
304        self.explain_exists()
305    }
306
307    /// Explain scalar load execution shape without executing the query.
308    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
309    where
310        E: EntityValue,
311    {
312        self.explain_execution_descriptor()
313    }
314
315    /// Explain scalar load execution shape as deterministic text.
316    pub fn explain_execution_text(&self) -> Result<String, QueryError>
317    where
318        E: EntityValue,
319    {
320        self.render_execution_descriptor(|descriptor| descriptor.render_text_tree())
321    }
322
323    /// Explain scalar load execution shape as canonical JSON.
324    pub fn explain_execution_json(&self) -> Result<String, QueryError>
325    where
326        E: EntityValue,
327    {
328        self.render_execution_descriptor(|descriptor| descriptor.render_json_canonical())
329    }
330
331    /// Explain scalar load execution shape as verbose text with diagnostics.
332    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
333    where
334        E: EntityValue,
335    {
336        self.map_non_paged_query_output(
337            DbSession::explain_query_execution_verbose_with_visible_indexes,
338        )
339    }
340
341    /// Execute and return the number of matching rows.
342    pub fn count(&self) -> Result<u32, QueryError>
343    where
344        E: EntityValue,
345    {
346        self.map_prepared_existing_rows_terminal_output(
347            ExistingRowsTerminalStrategy::count_rows(),
348            FluentScalarTerminalOutput::into_count,
349        )
350    }
351
352    /// Execute and return the total persisted payload bytes for the effective
353    /// result window.
354    pub fn bytes(&self) -> Result<u64, QueryError>
355    where
356        E: EntityValue,
357    {
358        self.ensure_non_paged_mode_ready()?;
359
360        self.session.execute_fluent_bytes(self.query())
361    }
362
363    /// Execute and return the total serialized bytes for `field` over the
364    /// effective result window.
365    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
366    where
367        E: EntityValue,
368    {
369        self.with_non_paged_slot(field, |target_slot| {
370            self.session
371                .execute_fluent_bytes_by_slot(self.query(), target_slot)
372        })
373    }
374
375    /// Explain `bytes_by(field)` routing without executing the terminal.
376    pub fn explain_bytes_by(
377        &self,
378        field: impl AsRef<str>,
379    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
380    where
381        E: EntityValue,
382    {
383        self.with_non_paged_slot(field, |target_slot| {
384            self.session
385                .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
386        })
387    }
388
389    /// Execute and return the smallest matching identifier, if any.
390    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
391    where
392        E: EntityValue,
393    {
394        self.map_prepared_scalar_terminal_output(
395            ScalarTerminalStrategy::id_terminal(AggregateKind::Min),
396            FluentScalarTerminalOutput::into_id,
397        )
398    }
399
400    /// Explain scalar `min()` routing without executing the terminal.
401    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
402    where
403        E: EntityValue,
404    {
405        self.explain_prepared_aggregate_non_paged_terminal(&ScalarTerminalStrategy::id_terminal(
406            AggregateKind::Min,
407        ))
408    }
409
410    /// Execute and return the id of the row with the smallest value for `field`.
411    ///
412    /// Ties are deterministic: equal field values resolve by primary key ascending.
413    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
414    where
415        E: EntityValue,
416    {
417        self.with_non_paged_slot(field, |target_slot| {
418            self.map_prepared_scalar_terminal_output(
419                ScalarTerminalStrategy::id_by_slot(AggregateKind::Min, target_slot),
420                FluentScalarTerminalOutput::into_id,
421            )
422        })
423    }
424
425    /// Execute and return the largest matching identifier, if any.
426    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
427    where
428        E: EntityValue,
429    {
430        self.map_prepared_scalar_terminal_output(
431            ScalarTerminalStrategy::id_terminal(AggregateKind::Max),
432            FluentScalarTerminalOutput::into_id,
433        )
434    }
435
436    /// Explain scalar `max()` routing without executing the terminal.
437    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
438    where
439        E: EntityValue,
440    {
441        self.explain_prepared_aggregate_non_paged_terminal(&ScalarTerminalStrategy::id_terminal(
442            AggregateKind::Max,
443        ))
444    }
445
446    /// Execute and return the id of the row with the largest value for `field`.
447    ///
448    /// Ties are deterministic: equal field values resolve by primary key ascending.
449    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
450    where
451        E: EntityValue,
452    {
453        self.with_non_paged_slot(field, |target_slot| {
454            self.map_prepared_scalar_terminal_output(
455                ScalarTerminalStrategy::id_by_slot(AggregateKind::Max, target_slot),
456                FluentScalarTerminalOutput::into_id,
457            )
458        })
459    }
460
461    /// Execute and return the id at zero-based ordinal `nth` when rows are
462    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
463    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
464    where
465        E: EntityValue,
466    {
467        self.with_non_paged_slot(field, |target_slot| {
468            self.map_prepared_order_sensitive_terminal_output(
469                OrderSensitiveTerminalStrategy::nth_by_slot(target_slot, nth),
470                FluentScalarTerminalOutput::into_id,
471            )
472        })
473    }
474
475    /// Execute and return the sum of `field` over matching rows.
476    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
477    where
478        E: EntityValue,
479    {
480        self.with_non_paged_slot(field, |target_slot| {
481            self.execute_prepared_numeric_field_terminal(NumericFieldStrategy::sum_by_slot(
482                target_slot,
483            ))
484        })
485    }
486
487    /// Explain scalar `sum_by(field)` routing without executing the terminal.
488    pub fn explain_sum_by(
489        &self,
490        field: impl AsRef<str>,
491    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
492    where
493        E: EntityValue,
494    {
495        self.with_non_paged_slot(field, |target_slot| {
496            self.explain_prepared_aggregate_non_paged_terminal(&NumericFieldStrategy::sum_by_slot(
497                target_slot,
498            ))
499        })
500    }
501
502    /// Execute and return the sum of distinct `field` values.
503    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
504    where
505        E: EntityValue,
506    {
507        self.with_non_paged_slot(field, |target_slot| {
508            self.execute_prepared_numeric_field_terminal(
509                NumericFieldStrategy::sum_distinct_by_slot(target_slot),
510            )
511        })
512    }
513
514    /// Explain scalar `sum(distinct field)` routing without executing the terminal.
515    pub fn explain_sum_distinct_by(
516        &self,
517        field: impl AsRef<str>,
518    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
519    where
520        E: EntityValue,
521    {
522        self.with_non_paged_slot(field, |target_slot| {
523            self.explain_prepared_aggregate_non_paged_terminal(
524                &NumericFieldStrategy::sum_distinct_by_slot(target_slot),
525            )
526        })
527    }
528
529    /// Execute and return the average of `field` over matching rows.
530    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
531    where
532        E: EntityValue,
533    {
534        self.with_non_paged_slot(field, |target_slot| {
535            self.execute_prepared_numeric_field_terminal(NumericFieldStrategy::avg_by_slot(
536                target_slot,
537            ))
538        })
539    }
540
541    /// Explain scalar `avg_by(field)` routing without executing the terminal.
542    pub fn explain_avg_by(
543        &self,
544        field: impl AsRef<str>,
545    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
546    where
547        E: EntityValue,
548    {
549        self.with_non_paged_slot(field, |target_slot| {
550            self.explain_prepared_aggregate_non_paged_terminal(&NumericFieldStrategy::avg_by_slot(
551                target_slot,
552            ))
553        })
554    }
555
556    /// Execute and return the average of distinct `field` values.
557    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
558    where
559        E: EntityValue,
560    {
561        self.with_non_paged_slot(field, |target_slot| {
562            self.execute_prepared_numeric_field_terminal(
563                NumericFieldStrategy::avg_distinct_by_slot(target_slot),
564            )
565        })
566    }
567
568    /// Explain scalar `avg(distinct field)` routing without executing the terminal.
569    pub fn explain_avg_distinct_by(
570        &self,
571        field: impl AsRef<str>,
572    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
573    where
574        E: EntityValue,
575    {
576        self.with_non_paged_slot(field, |target_slot| {
577            self.explain_prepared_aggregate_non_paged_terminal(
578                &NumericFieldStrategy::avg_distinct_by_slot(target_slot),
579            )
580        })
581    }
582
583    /// Execute and return the median id by `field` using deterministic ordering
584    /// `(field asc, primary key asc)`.
585    ///
586    /// Even-length windows select the lower median.
587    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
588    where
589        E: EntityValue,
590    {
591        self.with_non_paged_slot(field, |target_slot| {
592            self.map_prepared_order_sensitive_terminal_output(
593                OrderSensitiveTerminalStrategy::median_by_slot(target_slot),
594                FluentScalarTerminalOutput::into_id,
595            )
596        })
597    }
598
599    /// Execute and return the number of distinct values for `field` over the
600    /// effective result window.
601    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
602    where
603        E: EntityValue,
604    {
605        self.with_non_paged_slot(field, |target_slot| {
606            self.map_prepared_projection_terminal_output(
607                ProjectionStrategy::count_distinct_by_slot(target_slot),
608                FluentProjectionTerminalOutput::into_count,
609            )
610        })
611    }
612
613    /// Explain `count_distinct_by(field)` routing without executing the terminal.
614    pub fn explain_count_distinct_by(
615        &self,
616        field: impl AsRef<str>,
617    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
618    where
619        E: EntityValue,
620    {
621        self.with_non_paged_slot(field, |target_slot| {
622            self.explain_prepared_projection_non_paged_terminal(
623                &ProjectionStrategy::count_distinct_by_slot(target_slot),
624            )
625        })
626    }
627
628    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
629    ///
630    /// Tie handling is deterministic for both extrema: primary key ascending.
631    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
632    where
633        E: EntityValue,
634    {
635        self.with_non_paged_slot(field, |target_slot| {
636            self.map_prepared_order_sensitive_terminal_output(
637                OrderSensitiveTerminalStrategy::min_max_by_slot(target_slot),
638                FluentScalarTerminalOutput::into_id_pair,
639            )
640        })
641    }
642
643    /// Execute and return projected field values for the effective result window.
644    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
645    where
646        E: EntityValue,
647    {
648        self.with_non_paged_slot(field, |target_slot| {
649            self.map_prepared_projection_terminal_output(
650                ProjectionStrategy::values_by_slot(target_slot),
651                FluentProjectionTerminalOutput::into_values,
652            )
653            .map(output_values)
654        })
655    }
656
657    /// Execute and return projected values for one shared bounded projection
658    /// over the effective response window.
659    pub fn project_values<P>(&self, projection: &P) -> Result<Vec<OutputValue>, QueryError>
660    where
661        E: EntityValue,
662        P: ValueProjectionExpr,
663    {
664        self.with_non_paged_slot(projection.field(), |target_slot| {
665            let values = self
666                .execute_prepared_projection_terminal_output(ProjectionStrategy::values_by_slot(
667                    target_slot,
668                ))?
669                .into_values()
670                .map_err(QueryError::execute)?;
671
672            Self::project_terminal_items(projection, values, |projection, value| {
673                projection.apply_value(value)
674            })
675            .map(output_values)
676        })
677    }
678
679    /// Explain `project_values(projection)` routing without executing it.
680    pub fn explain_project_values<P>(
681        &self,
682        projection: &P,
683    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
684    where
685        E: EntityValue,
686        P: ValueProjectionExpr,
687    {
688        self.with_non_paged_slot(projection.field(), |target_slot| {
689            self.explain_prepared_projection_non_paged_terminal(
690                &ProjectionStrategy::values_by_slot(target_slot),
691            )
692        })
693    }
694
695    /// Explain `values_by(field)` routing without executing the terminal.
696    pub fn explain_values_by(
697        &self,
698        field: impl AsRef<str>,
699    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
700    where
701        E: EntityValue,
702    {
703        self.with_non_paged_slot(field, |target_slot| {
704            self.explain_prepared_projection_non_paged_terminal(
705                &ProjectionStrategy::values_by_slot(target_slot),
706            )
707        })
708    }
709
710    /// Execute and return the first `k` rows from the effective response window.
711    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
712    where
713        E: EntityValue,
714    {
715        self.ensure_non_paged_mode_ready()?;
716
717        self.session.execute_fluent_take(self.query(), take_count)
718    }
719
720    /// Execute and return the top `k` rows by `field` under deterministic
721    /// ordering `(field desc, primary_key asc)` over the effective response
722    /// window.
723    ///
724    /// This terminal applies its own ordering and does not preserve query
725    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
726    /// matches `max_by(field)` selection semantics.
727    pub fn top_k_by(
728        &self,
729        field: impl AsRef<str>,
730        take_count: u32,
731    ) -> Result<EntityResponse<E>, QueryError>
732    where
733        E: EntityValue,
734    {
735        self.with_non_paged_slot(field, |target_slot| {
736            self.session.execute_fluent_ranked_rows_by_slot(
737                self.query(),
738                target_slot,
739                take_count,
740                true,
741            )
742        })
743    }
744
745    /// Execute and return the bottom `k` rows by `field` under deterministic
746    /// ordering `(field asc, primary_key asc)` over the effective response
747    /// window.
748    ///
749    /// This terminal applies its own ordering and does not preserve query
750    /// `order_term(...)` row order in the returned rows. For `k = 1`, this
751    /// matches `min_by(field)` selection semantics.
752    pub fn bottom_k_by(
753        &self,
754        field: impl AsRef<str>,
755        take_count: u32,
756    ) -> Result<EntityResponse<E>, QueryError>
757    where
758        E: EntityValue,
759    {
760        self.with_non_paged_slot(field, |target_slot| {
761            self.session.execute_fluent_ranked_rows_by_slot(
762                self.query(),
763                target_slot,
764                take_count,
765                false,
766            )
767        })
768    }
769
770    /// Execute and return projected values for the top `k` rows by `field`
771    /// under deterministic ordering `(field desc, primary_key asc)` over the
772    /// effective response window.
773    ///
774    /// Ranking is applied before projection and does not preserve query
775    /// `order_term(...)` row order in the returned values. For `k = 1`, this
776    /// matches `max_by(field)` projected to one value.
777    pub fn top_k_by_values(
778        &self,
779        field: impl AsRef<str>,
780        take_count: u32,
781    ) -> Result<Vec<OutputValue>, QueryError>
782    where
783        E: EntityValue,
784    {
785        self.with_non_paged_slot(field, |target_slot| {
786            self.session
787                .execute_fluent_ranked_values_by_slot(self.query(), target_slot, take_count, true)
788                .map(output_values)
789        })
790    }
791
792    /// Execute and return projected values for the bottom `k` rows by `field`
793    /// under deterministic ordering `(field asc, primary_key asc)` over the
794    /// effective response window.
795    ///
796    /// Ranking is applied before projection and does not preserve query
797    /// `order_term(...)` row order in the returned values. For `k = 1`, this
798    /// matches `min_by(field)` projected to one value.
799    pub fn bottom_k_by_values(
800        &self,
801        field: impl AsRef<str>,
802        take_count: u32,
803    ) -> Result<Vec<OutputValue>, QueryError>
804    where
805        E: EntityValue,
806    {
807        self.with_non_paged_slot(field, |target_slot| {
808            self.session
809                .execute_fluent_ranked_values_by_slot(self.query(), target_slot, take_count, false)
810                .map(output_values)
811        })
812    }
813
814    /// Execute and return projected id/value pairs for the top `k` rows by
815    /// `field` under deterministic ordering `(field desc, primary_key asc)`
816    /// over the effective response window.
817    ///
818    /// Ranking is applied before projection and does not preserve query
819    /// `order_term(...)` row order in the returned values. For `k = 1`, this
820    /// matches `max_by(field)` projected to one `(id, value)` pair.
821    pub fn top_k_by_with_ids(
822        &self,
823        field: impl AsRef<str>,
824        take_count: u32,
825    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
826    where
827        E: EntityValue,
828    {
829        self.with_non_paged_slot(field, |target_slot| {
830            self.session
831                .execute_fluent_ranked_values_with_ids_by_slot(
832                    self.query(),
833                    target_slot,
834                    take_count,
835                    true,
836                )
837                .map(output_values_with_ids)
838        })
839    }
840
841    /// Execute and return projected id/value pairs for the bottom `k` rows by
842    /// `field` under deterministic ordering `(field asc, primary_key asc)`
843    /// over the effective response window.
844    ///
845    /// Ranking is applied before projection and does not preserve query
846    /// `order_term(...)` row order in the returned values. For `k = 1`, this
847    /// matches `min_by(field)` projected to one `(id, value)` pair.
848    pub fn bottom_k_by_with_ids(
849        &self,
850        field: impl AsRef<str>,
851        take_count: u32,
852    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
853    where
854        E: EntityValue,
855    {
856        self.with_non_paged_slot(field, |target_slot| {
857            self.session
858                .execute_fluent_ranked_values_with_ids_by_slot(
859                    self.query(),
860                    target_slot,
861                    take_count,
862                    false,
863                )
864                .map(output_values_with_ids)
865        })
866    }
867
868    /// Execute and return distinct projected field values for the effective
869    /// result window, preserving first-observed value order.
870    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<OutputValue>, QueryError>
871    where
872        E: EntityValue,
873    {
874        self.with_non_paged_slot(field, |target_slot| {
875            self.map_prepared_projection_terminal_output(
876                ProjectionStrategy::distinct_values_by_slot(target_slot),
877                FluentProjectionTerminalOutput::into_values,
878            )
879            .map(output_values)
880        })
881    }
882
883    /// Explain `distinct_values_by(field)` routing without executing the terminal.
884    pub fn explain_distinct_values_by(
885        &self,
886        field: impl AsRef<str>,
887    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
888    where
889        E: EntityValue,
890    {
891        self.with_non_paged_slot(field, |target_slot| {
892            self.explain_prepared_projection_non_paged_terminal(
893                &ProjectionStrategy::distinct_values_by_slot(target_slot),
894            )
895        })
896    }
897
898    /// Execute and return projected field values paired with row ids for the
899    /// effective result window.
900    pub fn values_by_with_ids(
901        &self,
902        field: impl AsRef<str>,
903    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
904    where
905        E: EntityValue,
906    {
907        self.with_non_paged_slot(field, |target_slot| {
908            self.map_prepared_projection_terminal_output(
909                ProjectionStrategy::values_by_with_ids_slot(target_slot),
910                FluentProjectionTerminalOutput::into_values_with_ids,
911            )
912            .map(output_values_with_ids)
913        })
914    }
915
916    /// Execute and return projected id/value pairs for one shared bounded
917    /// projection over the effective response window.
918    pub fn project_values_with_ids<P>(
919        &self,
920        projection: &P,
921    ) -> Result<Vec<(Id<E>, OutputValue)>, QueryError>
922    where
923        E: EntityValue,
924        P: ValueProjectionExpr,
925    {
926        self.with_non_paged_slot(projection.field(), |target_slot| {
927            let values = self
928                .execute_prepared_projection_terminal_output(
929                    ProjectionStrategy::values_by_with_ids_slot(target_slot),
930                )?
931                .into_values_with_ids()
932                .map_err(QueryError::execute)?;
933
934            Self::project_terminal_items(projection, values, |projection, (id, value)| {
935                Ok((id, projection.apply_value(value)?))
936            })
937            .map(output_values_with_ids)
938        })
939    }
940
941    /// Explain `values_by_with_ids(field)` routing without executing the terminal.
942    pub fn explain_values_by_with_ids(
943        &self,
944        field: impl AsRef<str>,
945    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
946    where
947        E: EntityValue,
948    {
949        self.with_non_paged_slot(field, |target_slot| {
950            self.explain_prepared_projection_non_paged_terminal(
951                &ProjectionStrategy::values_by_with_ids_slot(target_slot),
952            )
953        })
954    }
955
956    /// Execute and return the first projected field value in effective response
957    /// order, if any.
958    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
959    where
960        E: EntityValue,
961    {
962        self.with_non_paged_slot(field, |target_slot| {
963            self.map_prepared_projection_terminal_output(
964                ProjectionStrategy::first_value_by_slot(target_slot),
965                FluentProjectionTerminalOutput::into_terminal_value,
966            )
967            .map(|value| value.map(output))
968        })
969    }
970
971    /// Execute and return the first projected value for one shared bounded
972    /// projection in effective response order, if any.
973    pub fn project_first_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
974    where
975        E: EntityValue,
976        P: ValueProjectionExpr,
977    {
978        self.with_non_paged_slot(projection.field(), |target_slot| {
979            let value = self
980                .execute_prepared_projection_terminal_output(
981                    ProjectionStrategy::first_value_by_slot(target_slot),
982                )?
983                .into_terminal_value()
984                .map_err(QueryError::execute)?;
985
986            let mut projected =
987                Self::project_terminal_items(projection, value, |projection, value| {
988                    projection.apply_value(value)
989                })?;
990
991            Ok(projected.pop().map(output))
992        })
993    }
994
995    /// Explain `first_value_by(field)` routing without executing the terminal.
996    pub fn explain_first_value_by(
997        &self,
998        field: impl AsRef<str>,
999    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1000    where
1001        E: EntityValue,
1002    {
1003        self.with_non_paged_slot(field, |target_slot| {
1004            self.explain_prepared_projection_non_paged_terminal(
1005                &ProjectionStrategy::first_value_by_slot(target_slot),
1006            )
1007        })
1008    }
1009
1010    /// Execute and return the last projected field value in effective response
1011    /// order, if any.
1012    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<OutputValue>, QueryError>
1013    where
1014        E: EntityValue,
1015    {
1016        self.with_non_paged_slot(field, |target_slot| {
1017            self.map_prepared_projection_terminal_output(
1018                ProjectionStrategy::last_value_by_slot(target_slot),
1019                FluentProjectionTerminalOutput::into_terminal_value,
1020            )
1021            .map(|value| value.map(output))
1022        })
1023    }
1024
1025    /// Execute and return the last projected value for one shared bounded
1026    /// projection in effective response order, if any.
1027    pub fn project_last_value<P>(&self, projection: &P) -> Result<Option<OutputValue>, QueryError>
1028    where
1029        E: EntityValue,
1030        P: ValueProjectionExpr,
1031    {
1032        self.with_non_paged_slot(projection.field(), |target_slot| {
1033            let value = self
1034                .execute_prepared_projection_terminal_output(
1035                    ProjectionStrategy::last_value_by_slot(target_slot),
1036                )?
1037                .into_terminal_value()
1038                .map_err(QueryError::execute)?;
1039
1040            let mut projected =
1041                Self::project_terminal_items(projection, value, |projection, value| {
1042                    projection.apply_value(value)
1043                })?;
1044
1045            Ok(projected.pop().map(output))
1046        })
1047    }
1048
1049    /// Explain `last_value_by(field)` routing without executing the terminal.
1050    pub fn explain_last_value_by(
1051        &self,
1052        field: impl AsRef<str>,
1053    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
1054    where
1055        E: EntityValue,
1056    {
1057        self.with_non_paged_slot(field, |target_slot| {
1058            self.explain_prepared_projection_non_paged_terminal(
1059                &ProjectionStrategy::last_value_by_slot(target_slot),
1060            )
1061        })
1062    }
1063
1064    /// Execute and return the first matching identifier in response order, if any.
1065    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
1066    where
1067        E: EntityValue,
1068    {
1069        self.map_prepared_order_sensitive_terminal_output(
1070            OrderSensitiveTerminalStrategy::first(),
1071            FluentScalarTerminalOutput::into_id,
1072        )
1073    }
1074
1075    /// Explain scalar `first()` routing without executing the terminal.
1076    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1077    where
1078        E: EntityValue,
1079    {
1080        self.explain_prepared_aggregate_non_paged_terminal(&OrderSensitiveTerminalStrategy::first())
1081    }
1082
1083    /// Execute and return the last matching identifier in response order, if any.
1084    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
1085    where
1086        E: EntityValue,
1087    {
1088        self.map_prepared_order_sensitive_terminal_output(
1089            OrderSensitiveTerminalStrategy::last(),
1090            FluentScalarTerminalOutput::into_id,
1091        )
1092    }
1093
1094    /// Explain scalar `last()` routing without executing the terminal.
1095    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
1096    where
1097        E: EntityValue,
1098    {
1099        self.explain_prepared_aggregate_non_paged_terminal(&OrderSensitiveTerminalStrategy::last())
1100    }
1101
1102    /// Execute and require exactly one matching row.
1103    pub fn require_one(&self) -> Result<(), QueryError>
1104    where
1105        E: EntityValue,
1106    {
1107        self.execute()?.into_rows()?.require_one()?;
1108        Ok(())
1109    }
1110
1111    /// Execute and require at least one matching row.
1112    pub fn require_some(&self) -> Result<(), QueryError>
1113    where
1114        E: EntityValue,
1115    {
1116        self.execute()?.into_rows()?.require_some()?;
1117        Ok(())
1118    }
1119}