Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession,
9        executor::{ExecutablePlan, LoadExecutor},
10        query::fluent::load::FluentLoadQuery,
11        query::{
12            api::ResponseCardinalityExt,
13            builder::{
14                AggregateExpr,
15                aggregate::{exists, first, last, max, min},
16            },
17            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
18            intent::QueryError,
19        },
20        response::EntityResponse,
21    },
22    error::InternalError,
23    traits::{EntityKind, EntityValue},
24    types::{Decimal, Id},
25    value::Value,
26};
27
28type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
29
30impl<E> FluentLoadQuery<'_, E>
31where
32    E: EntityKind,
33{
34    // ------------------------------------------------------------------
35    // Execution (single semantic boundary)
36    // ------------------------------------------------------------------
37
38    /// Execute this query using the session's policy settings.
39    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
40    where
41        E: EntityValue,
42    {
43        self.ensure_non_paged_mode_ready()?;
44
45        self.session.execute_query(self.query())
46    }
47
48    // Run one scalar terminal through the canonical non-paged fluent policy
49    // gate before handing execution to the session load-query adapter.
50    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
51    where
52        E: EntityValue,
53        F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        self.session.execute_load_query_with(self.query(), execute)
58    }
59
60    // Run one scalar aggregate EXPLAIN terminal through the canonical
61    // non-paged fluent policy gate.
62    fn explain_scalar_non_paged_terminal(
63        &self,
64        aggregate: AggregateExpr,
65    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
66    where
67        E: EntityValue,
68    {
69        self.ensure_non_paged_mode_ready()?;
70
71        DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), aggregate)
72    }
73
74    // ------------------------------------------------------------------
75    // Execution terminals — semantic only
76    // ------------------------------------------------------------------
77
78    /// Execute and return whether the result set is empty.
79    pub fn is_empty(&self) -> Result<bool, QueryError>
80    where
81        E: EntityValue,
82    {
83        self.not_exists()
84    }
85
86    /// Execute and return whether no matching row exists.
87    pub fn not_exists(&self) -> Result<bool, QueryError>
88    where
89        E: EntityValue,
90    {
91        Ok(!self.exists()?)
92    }
93
94    /// Execute and return whether at least one matching row exists.
95    pub fn exists(&self) -> Result<bool, QueryError>
96    where
97        E: EntityValue,
98    {
99        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_exists(plan))
100    }
101
102    /// Explain scalar `exists()` routing without executing the terminal.
103    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
104    where
105        E: EntityValue,
106    {
107        self.explain_scalar_non_paged_terminal(exists())
108    }
109
110    /// Explain scalar `not_exists()` routing without executing the terminal.
111    ///
112    /// This remains an `exists()` execution plan with negated boolean semantics.
113    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
114    where
115        E: EntityValue,
116    {
117        self.explain_exists()
118    }
119
120    /// Explain scalar load execution shape without executing the query.
121    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
122    where
123        E: EntityValue,
124    {
125        self.query().explain_execution()
126    }
127
128    /// Explain scalar load execution shape as deterministic text.
129    pub fn explain_execution_text(&self) -> Result<String, QueryError>
130    where
131        E: EntityValue,
132    {
133        self.query().explain_execution_text()
134    }
135
136    /// Explain scalar load execution shape as canonical JSON.
137    pub fn explain_execution_json(&self) -> Result<String, QueryError>
138    where
139        E: EntityValue,
140    {
141        self.query().explain_execution_json()
142    }
143
144    /// Explain scalar load execution shape as verbose text with diagnostics.
145    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
146    where
147        E: EntityValue,
148    {
149        self.query().explain_execution_verbose()
150    }
151
152    /// Execute and return the number of matching rows.
153    pub fn count(&self) -> Result<u32, QueryError>
154    where
155        E: EntityValue,
156    {
157        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_count(plan))
158    }
159
160    /// Execute and return the total persisted payload bytes for the effective
161    /// result window.
162    pub fn bytes(&self) -> Result<u64, QueryError>
163    where
164        E: EntityValue,
165    {
166        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
167    }
168
169    /// Execute and return the total serialized bytes for `field` over the
170    /// effective result window.
171    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
172    where
173        E: EntityValue,
174    {
175        self.ensure_non_paged_mode_ready()?;
176
177        Self::with_slot(field, |target_slot| {
178            self.session
179                .execute_load_query_with(self.query(), move |load, plan| {
180                    load.bytes_by_slot(plan, target_slot)
181                })
182        })
183    }
184
185    /// Explain `bytes_by(field)` routing without executing the terminal.
186    pub fn explain_bytes_by(
187        &self,
188        field: impl AsRef<str>,
189    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
190    where
191        E: EntityValue,
192    {
193        self.ensure_non_paged_mode_ready()?;
194
195        Self::with_slot(field, |target_slot| {
196            DbSession::<E::Canister>::explain_load_query_bytes_by_with(
197                self.query(),
198                target_slot.field(),
199            )
200        })
201    }
202
203    /// Execute and return the smallest matching identifier, if any.
204    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
205    where
206        E: EntityValue,
207    {
208        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_min(plan))
209    }
210
211    /// Explain scalar `min()` routing without executing the terminal.
212    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
213    where
214        E: EntityValue,
215    {
216        self.explain_scalar_non_paged_terminal(min())
217    }
218
219    /// Execute and return the id of the row with the smallest value for `field`.
220    ///
221    /// Ties are deterministic: equal field values resolve by primary key ascending.
222    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
223    where
224        E: EntityValue,
225    {
226        self.ensure_non_paged_mode_ready()?;
227
228        Self::with_slot(field, |target_slot| {
229            self.session
230                .execute_load_query_with(self.query(), move |load, plan| {
231                    load.aggregate_min_by_slot(plan, target_slot)
232                })
233        })
234    }
235
236    /// Execute and return the largest matching identifier, if any.
237    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
238    where
239        E: EntityValue,
240    {
241        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_max(plan))
242    }
243
244    /// Explain scalar `max()` routing without executing the terminal.
245    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
246    where
247        E: EntityValue,
248    {
249        self.explain_scalar_non_paged_terminal(max())
250    }
251
252    /// Execute and return the id of the row with the largest value for `field`.
253    ///
254    /// Ties are deterministic: equal field values resolve by primary key ascending.
255    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
256    where
257        E: EntityValue,
258    {
259        self.ensure_non_paged_mode_ready()?;
260
261        Self::with_slot(field, |target_slot| {
262            self.session
263                .execute_load_query_with(self.query(), move |load, plan| {
264                    load.aggregate_max_by_slot(plan, target_slot)
265                })
266        })
267    }
268
269    /// Execute and return the id at zero-based ordinal `nth` when rows are
270    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
271    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
272    where
273        E: EntityValue,
274    {
275        self.ensure_non_paged_mode_ready()?;
276
277        Self::with_slot(field, |target_slot| {
278            self.session
279                .execute_load_query_with(self.query(), move |load, plan| {
280                    load.aggregate_nth_by_slot(plan, target_slot, nth)
281                })
282        })
283    }
284
285    /// Execute and return the sum of `field` over matching rows.
286    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
287    where
288        E: EntityValue,
289    {
290        self.ensure_non_paged_mode_ready()?;
291
292        Self::with_slot(field, |target_slot| {
293            self.session
294                .execute_load_query_with(self.query(), move |load, plan| {
295                    load.aggregate_sum_by_slot(plan, target_slot)
296                })
297        })
298    }
299
300    /// Execute and return the sum of distinct `field` values.
301    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
302    where
303        E: EntityValue,
304    {
305        self.ensure_non_paged_mode_ready()?;
306
307        Self::with_slot(field, |target_slot| {
308            self.session
309                .execute_load_query_with(self.query(), move |load, plan| {
310                    load.aggregate_sum_distinct_by_slot(plan, target_slot)
311                })
312        })
313    }
314
315    /// Execute and return the average of `field` over matching rows.
316    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
317    where
318        E: EntityValue,
319    {
320        self.ensure_non_paged_mode_ready()?;
321
322        Self::with_slot(field, |target_slot| {
323            self.session
324                .execute_load_query_with(self.query(), move |load, plan| {
325                    load.aggregate_avg_by_slot(plan, target_slot)
326                })
327        })
328    }
329
330    /// Execute and return the average of distinct `field` values.
331    pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
332    where
333        E: EntityValue,
334    {
335        self.ensure_non_paged_mode_ready()?;
336
337        Self::with_slot(field, |target_slot| {
338            self.session
339                .execute_load_query_with(self.query(), move |load, plan| {
340                    load.aggregate_avg_distinct_by_slot(plan, target_slot)
341                })
342        })
343    }
344
345    /// Execute and return the median id by `field` using deterministic ordering
346    /// `(field asc, primary key asc)`.
347    ///
348    /// Even-length windows select the lower median.
349    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
350    where
351        E: EntityValue,
352    {
353        self.ensure_non_paged_mode_ready()?;
354
355        Self::with_slot(field, |target_slot| {
356            self.session
357                .execute_load_query_with(self.query(), move |load, plan| {
358                    load.aggregate_median_by_slot(plan, target_slot)
359                })
360        })
361    }
362
363    /// Execute and return the number of distinct values for `field` over the
364    /// effective result window.
365    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
366    where
367        E: EntityValue,
368    {
369        self.ensure_non_paged_mode_ready()?;
370
371        Self::with_slot(field, |target_slot| {
372            self.session
373                .execute_load_query_with(self.query(), move |load, plan| {
374                    load.aggregate_count_distinct_by_slot(plan, target_slot)
375                })
376        })
377    }
378
379    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
380    ///
381    /// Tie handling is deterministic for both extrema: primary key ascending.
382    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
383    where
384        E: EntityValue,
385    {
386        self.ensure_non_paged_mode_ready()?;
387
388        Self::with_slot(field, |target_slot| {
389            self.session
390                .execute_load_query_with(self.query(), move |load, plan| {
391                    load.aggregate_min_max_by_slot(plan, target_slot)
392                })
393        })
394    }
395
396    /// Execute and return projected field values for the effective result window.
397    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.ensure_non_paged_mode_ready()?;
402
403        Self::with_slot(field, |target_slot| {
404            self.session
405                .execute_load_query_with(self.query(), move |load, plan| {
406                    load.values_by_slot(plan, target_slot)
407                })
408        })
409    }
410
411    /// Execute and return the first `k` rows from the effective response window.
412    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
413    where
414        E: EntityValue,
415    {
416        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
417    }
418
419    /// Execute and return the top `k` rows by `field` under deterministic
420    /// ordering `(field desc, primary_key asc)` over the effective response
421    /// window.
422    ///
423    /// This terminal applies its own ordering and does not preserve query
424    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
425    /// matches `max_by(field)` selection semantics.
426    pub fn top_k_by(
427        &self,
428        field: impl AsRef<str>,
429        take_count: u32,
430    ) -> Result<EntityResponse<E>, QueryError>
431    where
432        E: EntityValue,
433    {
434        self.ensure_non_paged_mode_ready()?;
435
436        Self::with_slot(field, |target_slot| {
437            self.session
438                .execute_load_query_with(self.query(), move |load, plan| {
439                    load.top_k_by_slot(plan, target_slot, take_count)
440                })
441        })
442    }
443
444    /// Execute and return the bottom `k` rows by `field` under deterministic
445    /// ordering `(field asc, primary_key asc)` over the effective response
446    /// window.
447    ///
448    /// This terminal applies its own ordering and does not preserve query
449    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
450    /// matches `min_by(field)` selection semantics.
451    pub fn bottom_k_by(
452        &self,
453        field: impl AsRef<str>,
454        take_count: u32,
455    ) -> Result<EntityResponse<E>, QueryError>
456    where
457        E: EntityValue,
458    {
459        self.ensure_non_paged_mode_ready()?;
460
461        Self::with_slot(field, |target_slot| {
462            self.session
463                .execute_load_query_with(self.query(), move |load, plan| {
464                    load.bottom_k_by_slot(plan, target_slot, take_count)
465                })
466        })
467    }
468
469    /// Execute and return projected values for the top `k` rows by `field`
470    /// under deterministic ordering `(field desc, primary_key asc)` over the
471    /// effective response window.
472    ///
473    /// Ranking is applied before projection and does not preserve query
474    /// `order_by(...)` row order in the returned values. For `k = 1`, this
475    /// matches `max_by(field)` projected to one value.
476    pub fn top_k_by_values(
477        &self,
478        field: impl AsRef<str>,
479        take_count: u32,
480    ) -> Result<Vec<Value>, QueryError>
481    where
482        E: EntityValue,
483    {
484        self.ensure_non_paged_mode_ready()?;
485
486        Self::with_slot(field, |target_slot| {
487            self.session
488                .execute_load_query_with(self.query(), move |load, plan| {
489                    load.top_k_by_values_slot(plan, target_slot, take_count)
490                })
491        })
492    }
493
494    /// Execute and return projected values for the bottom `k` rows by `field`
495    /// under deterministic ordering `(field asc, primary_key asc)` over the
496    /// effective response window.
497    ///
498    /// Ranking is applied before projection and does not preserve query
499    /// `order_by(...)` row order in the returned values. For `k = 1`, this
500    /// matches `min_by(field)` projected to one value.
501    pub fn bottom_k_by_values(
502        &self,
503        field: impl AsRef<str>,
504        take_count: u32,
505    ) -> Result<Vec<Value>, QueryError>
506    where
507        E: EntityValue,
508    {
509        self.ensure_non_paged_mode_ready()?;
510
511        Self::with_slot(field, |target_slot| {
512            self.session
513                .execute_load_query_with(self.query(), move |load, plan| {
514                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
515                })
516        })
517    }
518
519    /// Execute and return projected id/value pairs for the top `k` rows by
520    /// `field` under deterministic ordering `(field desc, primary_key asc)`
521    /// over the effective response window.
522    ///
523    /// Ranking is applied before projection and does not preserve query
524    /// `order_by(...)` row order in the returned values. For `k = 1`, this
525    /// matches `max_by(field)` projected to one `(id, value)` pair.
526    pub fn top_k_by_with_ids(
527        &self,
528        field: impl AsRef<str>,
529        take_count: u32,
530    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
531    where
532        E: EntityValue,
533    {
534        self.ensure_non_paged_mode_ready()?;
535
536        Self::with_slot(field, |target_slot| {
537            self.session
538                .execute_load_query_with(self.query(), move |load, plan| {
539                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
540                })
541        })
542    }
543
544    /// Execute and return projected id/value pairs for the bottom `k` rows by
545    /// `field` under deterministic ordering `(field asc, primary_key asc)`
546    /// over the effective response window.
547    ///
548    /// Ranking is applied before projection and does not preserve query
549    /// `order_by(...)` row order in the returned values. For `k = 1`, this
550    /// matches `min_by(field)` projected to one `(id, value)` pair.
551    pub fn bottom_k_by_with_ids(
552        &self,
553        field: impl AsRef<str>,
554        take_count: u32,
555    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
556    where
557        E: EntityValue,
558    {
559        self.ensure_non_paged_mode_ready()?;
560
561        Self::with_slot(field, |target_slot| {
562            self.session
563                .execute_load_query_with(self.query(), move |load, plan| {
564                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
565                })
566        })
567    }
568
569    /// Execute and return distinct projected field values for the effective
570    /// result window, preserving first-observed value order.
571    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
572    where
573        E: EntityValue,
574    {
575        self.ensure_non_paged_mode_ready()?;
576
577        Self::with_slot(field, |target_slot| {
578            self.session
579                .execute_load_query_with(self.query(), move |load, plan| {
580                    load.distinct_values_by_slot(plan, target_slot)
581                })
582        })
583    }
584
585    /// Execute and return projected field values paired with row ids for the
586    /// effective result window.
587    pub fn values_by_with_ids(
588        &self,
589        field: impl AsRef<str>,
590    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
591    where
592        E: EntityValue,
593    {
594        self.ensure_non_paged_mode_ready()?;
595
596        Self::with_slot(field, |target_slot| {
597            self.session
598                .execute_load_query_with(self.query(), move |load, plan| {
599                    load.values_by_with_ids_slot(plan, target_slot)
600                })
601        })
602    }
603
604    /// Execute and return the first projected field value in effective response
605    /// order, if any.
606    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
607    where
608        E: EntityValue,
609    {
610        self.ensure_non_paged_mode_ready()?;
611
612        Self::with_slot(field, |target_slot| {
613            self.session
614                .execute_load_query_with(self.query(), move |load, plan| {
615                    load.first_value_by_slot(plan, target_slot)
616                })
617        })
618    }
619
620    /// Execute and return the last projected field value in effective response
621    /// order, if any.
622    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
623    where
624        E: EntityValue,
625    {
626        self.ensure_non_paged_mode_ready()?;
627
628        Self::with_slot(field, |target_slot| {
629            self.session
630                .execute_load_query_with(self.query(), move |load, plan| {
631                    load.last_value_by_slot(plan, target_slot)
632                })
633        })
634    }
635
636    /// Execute and return the first matching identifier in response order, if any.
637    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
638    where
639        E: EntityValue,
640    {
641        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_first(plan))
642    }
643
644    /// Explain scalar `first()` routing without executing the terminal.
645    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
646    where
647        E: EntityValue,
648    {
649        self.explain_scalar_non_paged_terminal(first())
650    }
651
652    /// Execute and return the last matching identifier in response order, if any.
653    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
654    where
655        E: EntityValue,
656    {
657        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_last(plan))
658    }
659
660    /// Explain scalar `last()` routing without executing the terminal.
661    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
662    where
663        E: EntityValue,
664    {
665        self.explain_scalar_non_paged_terminal(last())
666    }
667
668    /// Execute and require exactly one matching row.
669    pub fn require_one(&self) -> Result<(), QueryError>
670    where
671        E: EntityValue,
672    {
673        self.execute()?.require_one()?;
674        Ok(())
675    }
676
677    /// Execute and require at least one matching row.
678    pub fn require_some(&self) -> Result<(), QueryError>
679    where
680        E: EntityValue,
681    {
682        self.execute()?.require_some()?;
683        Ok(())
684    }
685}