Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1use crate::{
2    db::{
3        DbSession,
4        executor::{ExecutablePlan, LoadExecutor},
5        query::fluent::load::FluentLoadQuery,
6        query::{
7            api::ResponseCardinalityExt,
8            builder::{
9                AggregateExpr,
10                aggregate::{exists, first, last, max, min},
11            },
12            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
13            intent::QueryError,
14        },
15        response::EntityResponse,
16    },
17    error::InternalError,
18    traits::{EntityKind, EntityValue},
19    types::{Decimal, Id},
20    value::Value,
21};
22
23type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
24
25impl<E> FluentLoadQuery<'_, E>
26where
27    E: EntityKind,
28{
29    // ------------------------------------------------------------------
30    // Execution (single semantic boundary)
31    // ------------------------------------------------------------------
32
33    /// Execute this query using the session's policy settings.
34    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
35    where
36        E: EntityValue,
37    {
38        self.ensure_non_paged_mode_ready()?;
39
40        self.session.execute_query(self.query())
41    }
42
43    // Run one scalar terminal through the canonical non-paged fluent policy
44    // gate before handing execution to the session load-query adapter.
45    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
46    where
47        E: EntityValue,
48        F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
49    {
50        self.ensure_non_paged_mode_ready()?;
51
52        self.session.execute_load_query_with(self.query(), execute)
53    }
54
55    // Run one scalar aggregate EXPLAIN terminal through the canonical
56    // non-paged fluent policy gate.
57    fn explain_scalar_non_paged_terminal(
58        &self,
59        aggregate: AggregateExpr,
60    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
61    where
62        E: EntityValue,
63    {
64        self.ensure_non_paged_mode_ready()?;
65
66        DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), aggregate)
67    }
68
69    // ------------------------------------------------------------------
70    // Execution terminals — semantic only
71    // ------------------------------------------------------------------
72
73    /// Execute and return whether the result set is empty.
74    pub fn is_empty(&self) -> Result<bool, QueryError>
75    where
76        E: EntityValue,
77    {
78        Ok(!self.exists()?)
79    }
80
81    /// Execute and return whether at least one matching row exists.
82    pub fn exists(&self) -> Result<bool, QueryError>
83    where
84        E: EntityValue,
85    {
86        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_exists(plan))
87    }
88
89    /// Explain scalar `exists()` routing without executing the terminal.
90    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
91    where
92        E: EntityValue,
93    {
94        self.explain_scalar_non_paged_terminal(exists())
95    }
96
97    /// Explain scalar load execution shape without executing the query.
98    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
99    where
100        E: EntityValue,
101    {
102        self.query().explain_execution()
103    }
104
105    /// Explain scalar load execution shape as deterministic text.
106    pub fn explain_execution_text(&self) -> Result<String, QueryError>
107    where
108        E: EntityValue,
109    {
110        self.query().explain_execution_text()
111    }
112
113    /// Explain scalar load execution shape as canonical JSON.
114    pub fn explain_execution_json(&self) -> Result<String, QueryError>
115    where
116        E: EntityValue,
117    {
118        self.query().explain_execution_json()
119    }
120
121    /// Explain scalar load execution shape as verbose text with diagnostics.
122    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
123    where
124        E: EntityValue,
125    {
126        self.query().explain_execution_verbose()
127    }
128
129    /// Execute and return the number of matching rows.
130    pub fn count(&self) -> Result<u32, QueryError>
131    where
132        E: EntityValue,
133    {
134        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_count(plan))
135    }
136
137    /// Execute and return the total persisted payload bytes for the effective
138    /// result window.
139    pub fn bytes(&self) -> Result<u64, QueryError>
140    where
141        E: EntityValue,
142    {
143        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
144    }
145
146    /// Execute and return the total serialized bytes for `field` over the
147    /// effective result window.
148    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
149    where
150        E: EntityValue,
151    {
152        self.ensure_non_paged_mode_ready()?;
153
154        Self::with_slot(field, |target_slot| {
155            self.session
156                .execute_load_query_with(self.query(), move |load, plan| {
157                    load.bytes_by_slot(plan, target_slot)
158                })
159        })
160    }
161
162    /// Execute and return the smallest matching identifier, if any.
163    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
164    where
165        E: EntityValue,
166    {
167        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_min(plan))
168    }
169
170    /// Explain scalar `min()` routing without executing the terminal.
171    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
172    where
173        E: EntityValue,
174    {
175        self.explain_scalar_non_paged_terminal(min())
176    }
177
178    /// Execute and return the id of the row with the smallest value for `field`.
179    ///
180    /// Ties are deterministic: equal field values resolve by primary key ascending.
181    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
182    where
183        E: EntityValue,
184    {
185        self.ensure_non_paged_mode_ready()?;
186
187        Self::with_slot(field, |target_slot| {
188            self.session
189                .execute_load_query_with(self.query(), move |load, plan| {
190                    load.aggregate_min_by_slot(plan, target_slot)
191                })
192        })
193    }
194
195    /// Execute and return the largest matching identifier, if any.
196    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
197    where
198        E: EntityValue,
199    {
200        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_max(plan))
201    }
202
203    /// Explain scalar `max()` routing without executing the terminal.
204    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
205    where
206        E: EntityValue,
207    {
208        self.explain_scalar_non_paged_terminal(max())
209    }
210
211    /// Execute and return the id of the row with the largest value for `field`.
212    ///
213    /// Ties are deterministic: equal field values resolve by primary key ascending.
214    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
215    where
216        E: EntityValue,
217    {
218        self.ensure_non_paged_mode_ready()?;
219
220        Self::with_slot(field, |target_slot| {
221            self.session
222                .execute_load_query_with(self.query(), move |load, plan| {
223                    load.aggregate_max_by_slot(plan, target_slot)
224                })
225        })
226    }
227
228    /// Execute and return the id at zero-based ordinal `nth` when rows are
229    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
230    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
231    where
232        E: EntityValue,
233    {
234        self.ensure_non_paged_mode_ready()?;
235
236        Self::with_slot(field, |target_slot| {
237            self.session
238                .execute_load_query_with(self.query(), move |load, plan| {
239                    load.aggregate_nth_by_slot(plan, target_slot, nth)
240                })
241        })
242    }
243
244    /// Execute and return the sum of `field` over matching rows.
245    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
246    where
247        E: EntityValue,
248    {
249        self.ensure_non_paged_mode_ready()?;
250
251        Self::with_slot(field, |target_slot| {
252            self.session
253                .execute_load_query_with(self.query(), move |load, plan| {
254                    load.aggregate_sum_by_slot(plan, target_slot)
255                })
256        })
257    }
258
259    /// Execute and return the sum of distinct `field` values.
260    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
261    where
262        E: EntityValue,
263    {
264        self.ensure_non_paged_mode_ready()?;
265
266        Self::with_slot(field, |target_slot| {
267            self.session
268                .execute_load_query_with(self.query(), move |load, plan| {
269                    load.aggregate_sum_distinct_by_slot(plan, target_slot)
270                })
271        })
272    }
273
274    /// Execute and return the average of `field` over matching rows.
275    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
276    where
277        E: EntityValue,
278    {
279        self.ensure_non_paged_mode_ready()?;
280
281        Self::with_slot(field, |target_slot| {
282            self.session
283                .execute_load_query_with(self.query(), move |load, plan| {
284                    load.aggregate_avg_by_slot(plan, target_slot)
285                })
286        })
287    }
288
289    /// Execute and return the median id by `field` using deterministic ordering
290    /// `(field asc, primary key asc)`.
291    ///
292    /// Even-length windows select the lower median.
293    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
294    where
295        E: EntityValue,
296    {
297        self.ensure_non_paged_mode_ready()?;
298
299        Self::with_slot(field, |target_slot| {
300            self.session
301                .execute_load_query_with(self.query(), move |load, plan| {
302                    load.aggregate_median_by_slot(plan, target_slot)
303                })
304        })
305    }
306
307    /// Execute and return the number of distinct values for `field` over the
308    /// effective result window.
309    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
310    where
311        E: EntityValue,
312    {
313        self.ensure_non_paged_mode_ready()?;
314
315        Self::with_slot(field, |target_slot| {
316            self.session
317                .execute_load_query_with(self.query(), move |load, plan| {
318                    load.aggregate_count_distinct_by_slot(plan, target_slot)
319                })
320        })
321    }
322
323    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
324    ///
325    /// Tie handling is deterministic for both extrema: primary key ascending.
326    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
327    where
328        E: EntityValue,
329    {
330        self.ensure_non_paged_mode_ready()?;
331
332        Self::with_slot(field, |target_slot| {
333            self.session
334                .execute_load_query_with(self.query(), move |load, plan| {
335                    load.aggregate_min_max_by_slot(plan, target_slot)
336                })
337        })
338    }
339
340    /// Execute and return projected field values for the effective result window.
341    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
342    where
343        E: EntityValue,
344    {
345        self.ensure_non_paged_mode_ready()?;
346
347        Self::with_slot(field, |target_slot| {
348            self.session
349                .execute_load_query_with(self.query(), move |load, plan| {
350                    load.values_by_slot(plan, target_slot)
351                })
352        })
353    }
354
355    /// Execute and return the first `k` rows from the effective response window.
356    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
357    where
358        E: EntityValue,
359    {
360        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
361    }
362
363    /// Execute and return the top `k` rows by `field` under deterministic
364    /// ordering `(field desc, primary_key asc)` over the effective response
365    /// window.
366    ///
367    /// This terminal applies its own ordering and does not preserve query
368    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
369    /// matches `max_by(field)` selection semantics.
370    pub fn top_k_by(
371        &self,
372        field: impl AsRef<str>,
373        take_count: u32,
374    ) -> Result<EntityResponse<E>, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.ensure_non_paged_mode_ready()?;
379
380        Self::with_slot(field, |target_slot| {
381            self.session
382                .execute_load_query_with(self.query(), move |load, plan| {
383                    load.top_k_by_slot(plan, target_slot, take_count)
384                })
385        })
386    }
387
388    /// Execute and return the bottom `k` rows by `field` under deterministic
389    /// ordering `(field asc, primary_key asc)` over the effective response
390    /// window.
391    ///
392    /// This terminal applies its own ordering and does not preserve query
393    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
394    /// matches `min_by(field)` selection semantics.
395    pub fn bottom_k_by(
396        &self,
397        field: impl AsRef<str>,
398        take_count: u32,
399    ) -> Result<EntityResponse<E>, QueryError>
400    where
401        E: EntityValue,
402    {
403        self.ensure_non_paged_mode_ready()?;
404
405        Self::with_slot(field, |target_slot| {
406            self.session
407                .execute_load_query_with(self.query(), move |load, plan| {
408                    load.bottom_k_by_slot(plan, target_slot, take_count)
409                })
410        })
411    }
412
413    /// Execute and return projected values for the top `k` rows by `field`
414    /// under deterministic ordering `(field desc, primary_key asc)` over the
415    /// effective response window.
416    ///
417    /// Ranking is applied before projection and does not preserve query
418    /// `order_by(...)` row order in the returned values. For `k = 1`, this
419    /// matches `max_by(field)` projected to one value.
420    pub fn top_k_by_values(
421        &self,
422        field: impl AsRef<str>,
423        take_count: u32,
424    ) -> Result<Vec<Value>, QueryError>
425    where
426        E: EntityValue,
427    {
428        self.ensure_non_paged_mode_ready()?;
429
430        Self::with_slot(field, |target_slot| {
431            self.session
432                .execute_load_query_with(self.query(), move |load, plan| {
433                    load.top_k_by_values_slot(plan, target_slot, take_count)
434                })
435        })
436    }
437
438    /// Execute and return projected values for the bottom `k` rows by `field`
439    /// under deterministic ordering `(field asc, primary_key asc)` over the
440    /// effective response window.
441    ///
442    /// Ranking is applied before projection and does not preserve query
443    /// `order_by(...)` row order in the returned values. For `k = 1`, this
444    /// matches `min_by(field)` projected to one value.
445    pub fn bottom_k_by_values(
446        &self,
447        field: impl AsRef<str>,
448        take_count: u32,
449    ) -> Result<Vec<Value>, QueryError>
450    where
451        E: EntityValue,
452    {
453        self.ensure_non_paged_mode_ready()?;
454
455        Self::with_slot(field, |target_slot| {
456            self.session
457                .execute_load_query_with(self.query(), move |load, plan| {
458                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
459                })
460        })
461    }
462
463    /// Execute and return projected id/value pairs for the top `k` rows by
464    /// `field` under deterministic ordering `(field desc, primary_key asc)`
465    /// over the effective response window.
466    ///
467    /// Ranking is applied before projection and does not preserve query
468    /// `order_by(...)` row order in the returned values. For `k = 1`, this
469    /// matches `max_by(field)` projected to one `(id, value)` pair.
470    pub fn top_k_by_with_ids(
471        &self,
472        field: impl AsRef<str>,
473        take_count: u32,
474    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
475    where
476        E: EntityValue,
477    {
478        self.ensure_non_paged_mode_ready()?;
479
480        Self::with_slot(field, |target_slot| {
481            self.session
482                .execute_load_query_with(self.query(), move |load, plan| {
483                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
484                })
485        })
486    }
487
488    /// Execute and return projected id/value pairs for the bottom `k` rows by
489    /// `field` under deterministic ordering `(field asc, primary_key asc)`
490    /// over the effective response window.
491    ///
492    /// Ranking is applied before projection and does not preserve query
493    /// `order_by(...)` row order in the returned values. For `k = 1`, this
494    /// matches `min_by(field)` projected to one `(id, value)` pair.
495    pub fn bottom_k_by_with_ids(
496        &self,
497        field: impl AsRef<str>,
498        take_count: u32,
499    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
500    where
501        E: EntityValue,
502    {
503        self.ensure_non_paged_mode_ready()?;
504
505        Self::with_slot(field, |target_slot| {
506            self.session
507                .execute_load_query_with(self.query(), move |load, plan| {
508                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
509                })
510        })
511    }
512
513    /// Execute and return distinct projected field values for the effective
514    /// result window, preserving first-observed value order.
515    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
516    where
517        E: EntityValue,
518    {
519        self.ensure_non_paged_mode_ready()?;
520
521        Self::with_slot(field, |target_slot| {
522            self.session
523                .execute_load_query_with(self.query(), move |load, plan| {
524                    load.distinct_values_by_slot(plan, target_slot)
525                })
526        })
527    }
528
529    /// Execute and return projected field values paired with row ids for the
530    /// effective result window.
531    pub fn values_by_with_ids(
532        &self,
533        field: impl AsRef<str>,
534    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
535    where
536        E: EntityValue,
537    {
538        self.ensure_non_paged_mode_ready()?;
539
540        Self::with_slot(field, |target_slot| {
541            self.session
542                .execute_load_query_with(self.query(), move |load, plan| {
543                    load.values_by_with_ids_slot(plan, target_slot)
544                })
545        })
546    }
547
548    /// Execute and return the first projected field value in effective response
549    /// order, if any.
550    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
551    where
552        E: EntityValue,
553    {
554        self.ensure_non_paged_mode_ready()?;
555
556        Self::with_slot(field, |target_slot| {
557            self.session
558                .execute_load_query_with(self.query(), move |load, plan| {
559                    load.first_value_by_slot(plan, target_slot)
560                })
561        })
562    }
563
564    /// Execute and return the last projected field value in effective response
565    /// order, if any.
566    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
567    where
568        E: EntityValue,
569    {
570        self.ensure_non_paged_mode_ready()?;
571
572        Self::with_slot(field, |target_slot| {
573            self.session
574                .execute_load_query_with(self.query(), move |load, plan| {
575                    load.last_value_by_slot(plan, target_slot)
576                })
577        })
578    }
579
580    /// Execute and return the first matching identifier in response order, if any.
581    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
582    where
583        E: EntityValue,
584    {
585        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_first(plan))
586    }
587
588    /// Explain scalar `first()` routing without executing the terminal.
589    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
590    where
591        E: EntityValue,
592    {
593        self.explain_scalar_non_paged_terminal(first())
594    }
595
596    /// Execute and return the last matching identifier in response order, if any.
597    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
598    where
599        E: EntityValue,
600    {
601        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_last(plan))
602    }
603
604    /// Explain scalar `last()` routing without executing the terminal.
605    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
606    where
607        E: EntityValue,
608    {
609        self.explain_scalar_non_paged_terminal(last())
610    }
611
612    /// Execute and require exactly one matching row.
613    pub fn require_one(&self) -> Result<(), QueryError>
614    where
615        E: EntityValue,
616    {
617        self.execute()?.require_one()?;
618        Ok(())
619    }
620
621    /// Execute and require at least one matching row.
622    pub fn require_some(&self) -> Result<(), QueryError>
623    where
624        E: EntityValue,
625    {
626        self.execute()?.require_some()?;
627        Ok(())
628    }
629}