Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1//! Module: query::fluent::load::terminals
2//! Responsibility: fluent load terminal APIs and terminal-plan explanation entrypoints.
3//! Does not own: planner semantic validation or executor runtime routing decisions.
4//! Boundary: delegates to session planning/execution and returns typed query results.
5
6use crate::{
7    db::{
8        DbSession,
9        executor::{ExecutablePlan, LoadExecutor},
10        query::fluent::load::FluentLoadQuery,
11        query::{
12            api::ResponseCardinalityExt,
13            builder::{
14                AggregateExpr,
15                aggregate::{exists, first, last, max, min},
16            },
17            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
18            intent::QueryError,
19        },
20        response::EntityResponse,
21    },
22    error::InternalError,
23    traits::{EntityKind, EntityValue},
24    types::{Decimal, Id},
25    value::Value,
26};
27
28type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
29
30impl<E> FluentLoadQuery<'_, E>
31where
32    E: EntityKind,
33{
34    // ------------------------------------------------------------------
35    // Execution (single semantic boundary)
36    // ------------------------------------------------------------------
37
38    /// Execute this query using the session's policy settings.
39    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
40    where
41        E: EntityValue,
42    {
43        self.ensure_non_paged_mode_ready()?;
44
45        self.session.execute_query(self.query())
46    }
47
48    // Run one scalar terminal through the canonical non-paged fluent policy
49    // gate before handing execution to the session load-query adapter.
50    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
51    where
52        E: EntityValue,
53        F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
54    {
55        self.ensure_non_paged_mode_ready()?;
56
57        self.session.execute_load_query_with(self.query(), execute)
58    }
59
60    // Run one scalar aggregate EXPLAIN terminal through the canonical
61    // non-paged fluent policy gate.
62    fn explain_scalar_non_paged_terminal(
63        &self,
64        aggregate: AggregateExpr,
65    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
66    where
67        E: EntityValue,
68    {
69        self.ensure_non_paged_mode_ready()?;
70
71        DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), aggregate)
72    }
73
74    // ------------------------------------------------------------------
75    // Execution terminals — semantic only
76    // ------------------------------------------------------------------
77
78    /// Execute and return whether the result set is empty.
79    pub fn is_empty(&self) -> Result<bool, QueryError>
80    where
81        E: EntityValue,
82    {
83        self.not_exists()
84    }
85
86    /// Execute and return whether no matching row exists.
87    pub fn not_exists(&self) -> Result<bool, QueryError>
88    where
89        E: EntityValue,
90    {
91        Ok(!self.exists()?)
92    }
93
94    /// Execute and return whether at least one matching row exists.
95    pub fn exists(&self) -> Result<bool, QueryError>
96    where
97        E: EntityValue,
98    {
99        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_exists(plan))
100    }
101
102    /// Explain scalar `exists()` routing without executing the terminal.
103    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
104    where
105        E: EntityValue,
106    {
107        self.explain_scalar_non_paged_terminal(exists())
108    }
109
110    /// Explain scalar `not_exists()` routing without executing the terminal.
111    ///
112    /// This remains an `exists()` execution plan with negated boolean semantics.
113    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
114    where
115        E: EntityValue,
116    {
117        self.explain_exists()
118    }
119
120    /// Explain scalar load execution shape without executing the query.
121    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
122    where
123        E: EntityValue,
124    {
125        self.query().explain_execution()
126    }
127
128    /// Explain scalar load execution shape as deterministic text.
129    pub fn explain_execution_text(&self) -> Result<String, QueryError>
130    where
131        E: EntityValue,
132    {
133        self.query().explain_execution_text()
134    }
135
136    /// Explain scalar load execution shape as canonical JSON.
137    pub fn explain_execution_json(&self) -> Result<String, QueryError>
138    where
139        E: EntityValue,
140    {
141        self.query().explain_execution_json()
142    }
143
144    /// Explain scalar load execution shape as verbose text with diagnostics.
145    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
146    where
147        E: EntityValue,
148    {
149        self.query().explain_execution_verbose()
150    }
151
152    /// Execute and return the number of matching rows.
153    pub fn count(&self) -> Result<u32, QueryError>
154    where
155        E: EntityValue,
156    {
157        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_count(plan))
158    }
159
160    /// Execute and return the total persisted payload bytes for the effective
161    /// result window.
162    pub fn bytes(&self) -> Result<u64, QueryError>
163    where
164        E: EntityValue,
165    {
166        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
167    }
168
169    /// Execute and return the total serialized bytes for `field` over the
170    /// effective result window.
171    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
172    where
173        E: EntityValue,
174    {
175        self.ensure_non_paged_mode_ready()?;
176
177        Self::with_slot(field, |target_slot| {
178            self.session
179                .execute_load_query_with(self.query(), move |load, plan| {
180                    load.bytes_by_slot(plan, target_slot)
181                })
182        })
183    }
184
185    /// Execute and return the smallest matching identifier, if any.
186    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
187    where
188        E: EntityValue,
189    {
190        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_min(plan))
191    }
192
193    /// Explain scalar `min()` routing without executing the terminal.
194    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
195    where
196        E: EntityValue,
197    {
198        self.explain_scalar_non_paged_terminal(min())
199    }
200
201    /// Execute and return the id of the row with the smallest value for `field`.
202    ///
203    /// Ties are deterministic: equal field values resolve by primary key ascending.
204    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
205    where
206        E: EntityValue,
207    {
208        self.ensure_non_paged_mode_ready()?;
209
210        Self::with_slot(field, |target_slot| {
211            self.session
212                .execute_load_query_with(self.query(), move |load, plan| {
213                    load.aggregate_min_by_slot(plan, target_slot)
214                })
215        })
216    }
217
218    /// Execute and return the largest matching identifier, if any.
219    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
220    where
221        E: EntityValue,
222    {
223        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_max(plan))
224    }
225
226    /// Explain scalar `max()` routing without executing the terminal.
227    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
228    where
229        E: EntityValue,
230    {
231        self.explain_scalar_non_paged_terminal(max())
232    }
233
234    /// Execute and return the id of the row with the largest value for `field`.
235    ///
236    /// Ties are deterministic: equal field values resolve by primary key ascending.
237    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
238    where
239        E: EntityValue,
240    {
241        self.ensure_non_paged_mode_ready()?;
242
243        Self::with_slot(field, |target_slot| {
244            self.session
245                .execute_load_query_with(self.query(), move |load, plan| {
246                    load.aggregate_max_by_slot(plan, target_slot)
247                })
248        })
249    }
250
251    /// Execute and return the id at zero-based ordinal `nth` when rows are
252    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
253    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
254    where
255        E: EntityValue,
256    {
257        self.ensure_non_paged_mode_ready()?;
258
259        Self::with_slot(field, |target_slot| {
260            self.session
261                .execute_load_query_with(self.query(), move |load, plan| {
262                    load.aggregate_nth_by_slot(plan, target_slot, nth)
263                })
264        })
265    }
266
267    /// Execute and return the sum of `field` over matching rows.
268    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
269    where
270        E: EntityValue,
271    {
272        self.ensure_non_paged_mode_ready()?;
273
274        Self::with_slot(field, |target_slot| {
275            self.session
276                .execute_load_query_with(self.query(), move |load, plan| {
277                    load.aggregate_sum_by_slot(plan, target_slot)
278                })
279        })
280    }
281
282    /// Execute and return the sum of distinct `field` values.
283    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
284    where
285        E: EntityValue,
286    {
287        self.ensure_non_paged_mode_ready()?;
288
289        Self::with_slot(field, |target_slot| {
290            self.session
291                .execute_load_query_with(self.query(), move |load, plan| {
292                    load.aggregate_sum_distinct_by_slot(plan, target_slot)
293                })
294        })
295    }
296
297    /// Execute and return the average of `field` over matching rows.
298    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
299    where
300        E: EntityValue,
301    {
302        self.ensure_non_paged_mode_ready()?;
303
304        Self::with_slot(field, |target_slot| {
305            self.session
306                .execute_load_query_with(self.query(), move |load, plan| {
307                    load.aggregate_avg_by_slot(plan, target_slot)
308                })
309        })
310    }
311
312    /// Execute and return the median id by `field` using deterministic ordering
313    /// `(field asc, primary key asc)`.
314    ///
315    /// Even-length windows select the lower median.
316    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
317    where
318        E: EntityValue,
319    {
320        self.ensure_non_paged_mode_ready()?;
321
322        Self::with_slot(field, |target_slot| {
323            self.session
324                .execute_load_query_with(self.query(), move |load, plan| {
325                    load.aggregate_median_by_slot(plan, target_slot)
326                })
327        })
328    }
329
330    /// Execute and return the number of distinct values for `field` over the
331    /// effective result window.
332    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
333    where
334        E: EntityValue,
335    {
336        self.ensure_non_paged_mode_ready()?;
337
338        Self::with_slot(field, |target_slot| {
339            self.session
340                .execute_load_query_with(self.query(), move |load, plan| {
341                    load.aggregate_count_distinct_by_slot(plan, target_slot)
342                })
343        })
344    }
345
346    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
347    ///
348    /// Tie handling is deterministic for both extrema: primary key ascending.
349    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
350    where
351        E: EntityValue,
352    {
353        self.ensure_non_paged_mode_ready()?;
354
355        Self::with_slot(field, |target_slot| {
356            self.session
357                .execute_load_query_with(self.query(), move |load, plan| {
358                    load.aggregate_min_max_by_slot(plan, target_slot)
359                })
360        })
361    }
362
363    /// Execute and return projected field values for the effective result window.
364    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
365    where
366        E: EntityValue,
367    {
368        self.ensure_non_paged_mode_ready()?;
369
370        Self::with_slot(field, |target_slot| {
371            self.session
372                .execute_load_query_with(self.query(), move |load, plan| {
373                    load.values_by_slot(plan, target_slot)
374                })
375        })
376    }
377
378    /// Execute and return the first `k` rows from the effective response window.
379    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
380    where
381        E: EntityValue,
382    {
383        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
384    }
385
386    /// Execute and return the top `k` rows by `field` under deterministic
387    /// ordering `(field desc, primary_key asc)` over the effective response
388    /// window.
389    ///
390    /// This terminal applies its own ordering and does not preserve query
391    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
392    /// matches `max_by(field)` selection semantics.
393    pub fn top_k_by(
394        &self,
395        field: impl AsRef<str>,
396        take_count: u32,
397    ) -> Result<EntityResponse<E>, QueryError>
398    where
399        E: EntityValue,
400    {
401        self.ensure_non_paged_mode_ready()?;
402
403        Self::with_slot(field, |target_slot| {
404            self.session
405                .execute_load_query_with(self.query(), move |load, plan| {
406                    load.top_k_by_slot(plan, target_slot, take_count)
407                })
408        })
409    }
410
411    /// Execute and return the bottom `k` rows by `field` under deterministic
412    /// ordering `(field asc, primary_key asc)` over the effective response
413    /// window.
414    ///
415    /// This terminal applies its own ordering and does not preserve query
416    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
417    /// matches `min_by(field)` selection semantics.
418    pub fn bottom_k_by(
419        &self,
420        field: impl AsRef<str>,
421        take_count: u32,
422    ) -> Result<EntityResponse<E>, QueryError>
423    where
424        E: EntityValue,
425    {
426        self.ensure_non_paged_mode_ready()?;
427
428        Self::with_slot(field, |target_slot| {
429            self.session
430                .execute_load_query_with(self.query(), move |load, plan| {
431                    load.bottom_k_by_slot(plan, target_slot, take_count)
432                })
433        })
434    }
435
436    /// Execute and return projected values for the top `k` rows by `field`
437    /// under deterministic ordering `(field desc, primary_key asc)` over the
438    /// effective response window.
439    ///
440    /// Ranking is applied before projection and does not preserve query
441    /// `order_by(...)` row order in the returned values. For `k = 1`, this
442    /// matches `max_by(field)` projected to one value.
443    pub fn top_k_by_values(
444        &self,
445        field: impl AsRef<str>,
446        take_count: u32,
447    ) -> Result<Vec<Value>, QueryError>
448    where
449        E: EntityValue,
450    {
451        self.ensure_non_paged_mode_ready()?;
452
453        Self::with_slot(field, |target_slot| {
454            self.session
455                .execute_load_query_with(self.query(), move |load, plan| {
456                    load.top_k_by_values_slot(plan, target_slot, take_count)
457                })
458        })
459    }
460
461    /// Execute and return projected values for the bottom `k` rows by `field`
462    /// under deterministic ordering `(field asc, primary_key asc)` over the
463    /// effective response window.
464    ///
465    /// Ranking is applied before projection and does not preserve query
466    /// `order_by(...)` row order in the returned values. For `k = 1`, this
467    /// matches `min_by(field)` projected to one value.
468    pub fn bottom_k_by_values(
469        &self,
470        field: impl AsRef<str>,
471        take_count: u32,
472    ) -> Result<Vec<Value>, QueryError>
473    where
474        E: EntityValue,
475    {
476        self.ensure_non_paged_mode_ready()?;
477
478        Self::with_slot(field, |target_slot| {
479            self.session
480                .execute_load_query_with(self.query(), move |load, plan| {
481                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
482                })
483        })
484    }
485
486    /// Execute and return projected id/value pairs for the top `k` rows by
487    /// `field` under deterministic ordering `(field desc, primary_key asc)`
488    /// over the effective response window.
489    ///
490    /// Ranking is applied before projection and does not preserve query
491    /// `order_by(...)` row order in the returned values. For `k = 1`, this
492    /// matches `max_by(field)` projected to one `(id, value)` pair.
493    pub fn top_k_by_with_ids(
494        &self,
495        field: impl AsRef<str>,
496        take_count: u32,
497    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
498    where
499        E: EntityValue,
500    {
501        self.ensure_non_paged_mode_ready()?;
502
503        Self::with_slot(field, |target_slot| {
504            self.session
505                .execute_load_query_with(self.query(), move |load, plan| {
506                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
507                })
508        })
509    }
510
511    /// Execute and return projected id/value pairs for the bottom `k` rows by
512    /// `field` under deterministic ordering `(field asc, primary_key asc)`
513    /// over the effective response window.
514    ///
515    /// Ranking is applied before projection and does not preserve query
516    /// `order_by(...)` row order in the returned values. For `k = 1`, this
517    /// matches `min_by(field)` projected to one `(id, value)` pair.
518    pub fn bottom_k_by_with_ids(
519        &self,
520        field: impl AsRef<str>,
521        take_count: u32,
522    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
523    where
524        E: EntityValue,
525    {
526        self.ensure_non_paged_mode_ready()?;
527
528        Self::with_slot(field, |target_slot| {
529            self.session
530                .execute_load_query_with(self.query(), move |load, plan| {
531                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
532                })
533        })
534    }
535
536    /// Execute and return distinct projected field values for the effective
537    /// result window, preserving first-observed value order.
538    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
539    where
540        E: EntityValue,
541    {
542        self.ensure_non_paged_mode_ready()?;
543
544        Self::with_slot(field, |target_slot| {
545            self.session
546                .execute_load_query_with(self.query(), move |load, plan| {
547                    load.distinct_values_by_slot(plan, target_slot)
548                })
549        })
550    }
551
552    /// Execute and return projected field values paired with row ids for the
553    /// effective result window.
554    pub fn values_by_with_ids(
555        &self,
556        field: impl AsRef<str>,
557    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
558    where
559        E: EntityValue,
560    {
561        self.ensure_non_paged_mode_ready()?;
562
563        Self::with_slot(field, |target_slot| {
564            self.session
565                .execute_load_query_with(self.query(), move |load, plan| {
566                    load.values_by_with_ids_slot(plan, target_slot)
567                })
568        })
569    }
570
571    /// Execute and return the first projected field value in effective response
572    /// order, if any.
573    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
574    where
575        E: EntityValue,
576    {
577        self.ensure_non_paged_mode_ready()?;
578
579        Self::with_slot(field, |target_slot| {
580            self.session
581                .execute_load_query_with(self.query(), move |load, plan| {
582                    load.first_value_by_slot(plan, target_slot)
583                })
584        })
585    }
586
587    /// Execute and return the last projected field value in effective response
588    /// order, if any.
589    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
590    where
591        E: EntityValue,
592    {
593        self.ensure_non_paged_mode_ready()?;
594
595        Self::with_slot(field, |target_slot| {
596            self.session
597                .execute_load_query_with(self.query(), move |load, plan| {
598                    load.last_value_by_slot(plan, target_slot)
599                })
600        })
601    }
602
603    /// Execute and return the first matching identifier in response order, if any.
604    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
605    where
606        E: EntityValue,
607    {
608        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_first(plan))
609    }
610
611    /// Explain scalar `first()` routing without executing the terminal.
612    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
613    where
614        E: EntityValue,
615    {
616        self.explain_scalar_non_paged_terminal(first())
617    }
618
619    /// Execute and return the last matching identifier in response order, if any.
620    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
621    where
622        E: EntityValue,
623    {
624        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_last(plan))
625    }
626
627    /// Explain scalar `last()` routing without executing the terminal.
628    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
629    where
630        E: EntityValue,
631    {
632        self.explain_scalar_non_paged_terminal(last())
633    }
634
635    /// Execute and require exactly one matching row.
636    pub fn require_one(&self) -> Result<(), QueryError>
637    where
638        E: EntityValue,
639    {
640        self.execute()?.require_one()?;
641        Ok(())
642    }
643
644    /// Execute and require at least one matching row.
645    pub fn require_some(&self) -> Result<(), QueryError>
646    where
647        E: EntityValue,
648    {
649        self.execute()?.require_some()?;
650        Ok(())
651    }
652}