Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1use crate::{
2    db::{
3        DbSession,
4        executor::{ExecutablePlan, LoadExecutor},
5        query::fluent::load::FluentLoadQuery,
6        query::{
7            api::ResponseCardinalityExt,
8            builder::{
9                AggregateExpr,
10                aggregate::{exists, first, last, max, min},
11            },
12            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
13            intent::QueryError,
14        },
15        response::EntityResponse,
16    },
17    error::InternalError,
18    traits::{EntityKind, EntityValue},
19    types::{Decimal, Id},
20    value::Value,
21};
22
23type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
24
25impl<E> FluentLoadQuery<'_, E>
26where
27    E: EntityKind,
28{
29    // ------------------------------------------------------------------
30    // Execution (single semantic boundary)
31    // ------------------------------------------------------------------
32
33    /// Execute this query using the session's policy settings.
34    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
35    where
36        E: EntityValue,
37    {
38        self.ensure_non_paged_mode_ready()?;
39
40        self.session.execute_query(self.query())
41    }
42
43    // Run one scalar terminal through the canonical non-paged fluent policy
44    // gate before handing execution to the session load-query adapter.
45    fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
46    where
47        E: EntityValue,
48        F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
49    {
50        self.ensure_non_paged_mode_ready()?;
51
52        self.session.execute_load_query_with(self.query(), execute)
53    }
54
55    // Run one scalar aggregate EXPLAIN terminal through the canonical
56    // non-paged fluent policy gate.
57    fn explain_scalar_non_paged_terminal(
58        &self,
59        aggregate: AggregateExpr,
60    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
61    where
62        E: EntityValue,
63    {
64        self.ensure_non_paged_mode_ready()?;
65
66        DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), aggregate)
67    }
68
69    // ------------------------------------------------------------------
70    // Execution terminals — semantic only
71    // ------------------------------------------------------------------
72
73    /// Execute and return whether the result set is empty.
74    pub fn is_empty(&self) -> Result<bool, QueryError>
75    where
76        E: EntityValue,
77    {
78        self.not_exists()
79    }
80
81    /// Execute and return whether no matching row exists.
82    pub fn not_exists(&self) -> Result<bool, QueryError>
83    where
84        E: EntityValue,
85    {
86        Ok(!self.exists()?)
87    }
88
89    /// Execute and return whether at least one matching row exists.
90    pub fn exists(&self) -> Result<bool, QueryError>
91    where
92        E: EntityValue,
93    {
94        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_exists(plan))
95    }
96
97    /// Explain scalar `exists()` routing without executing the terminal.
98    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
99    where
100        E: EntityValue,
101    {
102        self.explain_scalar_non_paged_terminal(exists())
103    }
104
105    /// Explain scalar `not_exists()` routing without executing the terminal.
106    ///
107    /// This remains an `exists()` execution plan with negated boolean semantics.
108    pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
109    where
110        E: EntityValue,
111    {
112        self.explain_exists()
113    }
114
115    /// Explain scalar load execution shape without executing the query.
116    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
117    where
118        E: EntityValue,
119    {
120        self.query().explain_execution()
121    }
122
123    /// Explain scalar load execution shape as deterministic text.
124    pub fn explain_execution_text(&self) -> Result<String, QueryError>
125    where
126        E: EntityValue,
127    {
128        self.query().explain_execution_text()
129    }
130
131    /// Explain scalar load execution shape as canonical JSON.
132    pub fn explain_execution_json(&self) -> Result<String, QueryError>
133    where
134        E: EntityValue,
135    {
136        self.query().explain_execution_json()
137    }
138
139    /// Explain scalar load execution shape as verbose text with diagnostics.
140    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
141    where
142        E: EntityValue,
143    {
144        self.query().explain_execution_verbose()
145    }
146
147    /// Execute and return the number of matching rows.
148    pub fn count(&self) -> Result<u32, QueryError>
149    where
150        E: EntityValue,
151    {
152        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_count(plan))
153    }
154
155    /// Execute and return the total persisted payload bytes for the effective
156    /// result window.
157    pub fn bytes(&self) -> Result<u64, QueryError>
158    where
159        E: EntityValue,
160    {
161        self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
162    }
163
164    /// Execute and return the total serialized bytes for `field` over the
165    /// effective result window.
166    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
167    where
168        E: EntityValue,
169    {
170        self.ensure_non_paged_mode_ready()?;
171
172        Self::with_slot(field, |target_slot| {
173            self.session
174                .execute_load_query_with(self.query(), move |load, plan| {
175                    load.bytes_by_slot(plan, target_slot)
176                })
177        })
178    }
179
180    /// Execute and return the smallest matching identifier, if any.
181    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
182    where
183        E: EntityValue,
184    {
185        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_min(plan))
186    }
187
188    /// Explain scalar `min()` routing without executing the terminal.
189    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
190    where
191        E: EntityValue,
192    {
193        self.explain_scalar_non_paged_terminal(min())
194    }
195
196    /// Execute and return the id of the row with the smallest value for `field`.
197    ///
198    /// Ties are deterministic: equal field values resolve by primary key ascending.
199    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
200    where
201        E: EntityValue,
202    {
203        self.ensure_non_paged_mode_ready()?;
204
205        Self::with_slot(field, |target_slot| {
206            self.session
207                .execute_load_query_with(self.query(), move |load, plan| {
208                    load.aggregate_min_by_slot(plan, target_slot)
209                })
210        })
211    }
212
213    /// Execute and return the largest matching identifier, if any.
214    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
215    where
216        E: EntityValue,
217    {
218        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_max(plan))
219    }
220
221    /// Explain scalar `max()` routing without executing the terminal.
222    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
223    where
224        E: EntityValue,
225    {
226        self.explain_scalar_non_paged_terminal(max())
227    }
228
229    /// Execute and return the id of the row with the largest value for `field`.
230    ///
231    /// Ties are deterministic: equal field values resolve by primary key ascending.
232    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
233    where
234        E: EntityValue,
235    {
236        self.ensure_non_paged_mode_ready()?;
237
238        Self::with_slot(field, |target_slot| {
239            self.session
240                .execute_load_query_with(self.query(), move |load, plan| {
241                    load.aggregate_max_by_slot(plan, target_slot)
242                })
243        })
244    }
245
246    /// Execute and return the id at zero-based ordinal `nth` when rows are
247    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
248    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
249    where
250        E: EntityValue,
251    {
252        self.ensure_non_paged_mode_ready()?;
253
254        Self::with_slot(field, |target_slot| {
255            self.session
256                .execute_load_query_with(self.query(), move |load, plan| {
257                    load.aggregate_nth_by_slot(plan, target_slot, nth)
258                })
259        })
260    }
261
262    /// Execute and return the sum of `field` over matching rows.
263    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
264    where
265        E: EntityValue,
266    {
267        self.ensure_non_paged_mode_ready()?;
268
269        Self::with_slot(field, |target_slot| {
270            self.session
271                .execute_load_query_with(self.query(), move |load, plan| {
272                    load.aggregate_sum_by_slot(plan, target_slot)
273                })
274        })
275    }
276
277    /// Execute and return the sum of distinct `field` values.
278    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
279    where
280        E: EntityValue,
281    {
282        self.ensure_non_paged_mode_ready()?;
283
284        Self::with_slot(field, |target_slot| {
285            self.session
286                .execute_load_query_with(self.query(), move |load, plan| {
287                    load.aggregate_sum_distinct_by_slot(plan, target_slot)
288                })
289        })
290    }
291
292    /// Execute and return the average of `field` over matching rows.
293    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
294    where
295        E: EntityValue,
296    {
297        self.ensure_non_paged_mode_ready()?;
298
299        Self::with_slot(field, |target_slot| {
300            self.session
301                .execute_load_query_with(self.query(), move |load, plan| {
302                    load.aggregate_avg_by_slot(plan, target_slot)
303                })
304        })
305    }
306
307    /// Execute and return the median id by `field` using deterministic ordering
308    /// `(field asc, primary key asc)`.
309    ///
310    /// Even-length windows select the lower median.
311    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
312    where
313        E: EntityValue,
314    {
315        self.ensure_non_paged_mode_ready()?;
316
317        Self::with_slot(field, |target_slot| {
318            self.session
319                .execute_load_query_with(self.query(), move |load, plan| {
320                    load.aggregate_median_by_slot(plan, target_slot)
321                })
322        })
323    }
324
325    /// Execute and return the number of distinct values for `field` over the
326    /// effective result window.
327    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
328    where
329        E: EntityValue,
330    {
331        self.ensure_non_paged_mode_ready()?;
332
333        Self::with_slot(field, |target_slot| {
334            self.session
335                .execute_load_query_with(self.query(), move |load, plan| {
336                    load.aggregate_count_distinct_by_slot(plan, target_slot)
337                })
338        })
339    }
340
341    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
342    ///
343    /// Tie handling is deterministic for both extrema: primary key ascending.
344    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
345    where
346        E: EntityValue,
347    {
348        self.ensure_non_paged_mode_ready()?;
349
350        Self::with_slot(field, |target_slot| {
351            self.session
352                .execute_load_query_with(self.query(), move |load, plan| {
353                    load.aggregate_min_max_by_slot(plan, target_slot)
354                })
355        })
356    }
357
358    /// Execute and return projected field values for the effective result window.
359    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
360    where
361        E: EntityValue,
362    {
363        self.ensure_non_paged_mode_ready()?;
364
365        Self::with_slot(field, |target_slot| {
366            self.session
367                .execute_load_query_with(self.query(), move |load, plan| {
368                    load.values_by_slot(plan, target_slot)
369                })
370        })
371    }
372
373    /// Execute and return the first `k` rows from the effective response window.
374    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
379    }
380
381    /// Execute and return the top `k` rows by `field` under deterministic
382    /// ordering `(field desc, primary_key asc)` over the effective response
383    /// window.
384    ///
385    /// This terminal applies its own ordering and does not preserve query
386    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
387    /// matches `max_by(field)` selection semantics.
388    pub fn top_k_by(
389        &self,
390        field: impl AsRef<str>,
391        take_count: u32,
392    ) -> Result<EntityResponse<E>, QueryError>
393    where
394        E: EntityValue,
395    {
396        self.ensure_non_paged_mode_ready()?;
397
398        Self::with_slot(field, |target_slot| {
399            self.session
400                .execute_load_query_with(self.query(), move |load, plan| {
401                    load.top_k_by_slot(plan, target_slot, take_count)
402                })
403        })
404    }
405
406    /// Execute and return the bottom `k` rows by `field` under deterministic
407    /// ordering `(field asc, primary_key asc)` over the effective response
408    /// window.
409    ///
410    /// This terminal applies its own ordering and does not preserve query
411    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
412    /// matches `min_by(field)` selection semantics.
413    pub fn bottom_k_by(
414        &self,
415        field: impl AsRef<str>,
416        take_count: u32,
417    ) -> Result<EntityResponse<E>, QueryError>
418    where
419        E: EntityValue,
420    {
421        self.ensure_non_paged_mode_ready()?;
422
423        Self::with_slot(field, |target_slot| {
424            self.session
425                .execute_load_query_with(self.query(), move |load, plan| {
426                    load.bottom_k_by_slot(plan, target_slot, take_count)
427                })
428        })
429    }
430
431    /// Execute and return projected values for the top `k` rows by `field`
432    /// under deterministic ordering `(field desc, primary_key asc)` over the
433    /// effective response window.
434    ///
435    /// Ranking is applied before projection and does not preserve query
436    /// `order_by(...)` row order in the returned values. For `k = 1`, this
437    /// matches `max_by(field)` projected to one value.
438    pub fn top_k_by_values(
439        &self,
440        field: impl AsRef<str>,
441        take_count: u32,
442    ) -> Result<Vec<Value>, QueryError>
443    where
444        E: EntityValue,
445    {
446        self.ensure_non_paged_mode_ready()?;
447
448        Self::with_slot(field, |target_slot| {
449            self.session
450                .execute_load_query_with(self.query(), move |load, plan| {
451                    load.top_k_by_values_slot(plan, target_slot, take_count)
452                })
453        })
454    }
455
456    /// Execute and return projected values for the bottom `k` rows by `field`
457    /// under deterministic ordering `(field asc, primary_key asc)` over the
458    /// effective response window.
459    ///
460    /// Ranking is applied before projection and does not preserve query
461    /// `order_by(...)` row order in the returned values. For `k = 1`, this
462    /// matches `min_by(field)` projected to one value.
463    pub fn bottom_k_by_values(
464        &self,
465        field: impl AsRef<str>,
466        take_count: u32,
467    ) -> Result<Vec<Value>, QueryError>
468    where
469        E: EntityValue,
470    {
471        self.ensure_non_paged_mode_ready()?;
472
473        Self::with_slot(field, |target_slot| {
474            self.session
475                .execute_load_query_with(self.query(), move |load, plan| {
476                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
477                })
478        })
479    }
480
481    /// Execute and return projected id/value pairs for the top `k` rows by
482    /// `field` under deterministic ordering `(field desc, primary_key asc)`
483    /// over the effective response window.
484    ///
485    /// Ranking is applied before projection and does not preserve query
486    /// `order_by(...)` row order in the returned values. For `k = 1`, this
487    /// matches `max_by(field)` projected to one `(id, value)` pair.
488    pub fn top_k_by_with_ids(
489        &self,
490        field: impl AsRef<str>,
491        take_count: u32,
492    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
493    where
494        E: EntityValue,
495    {
496        self.ensure_non_paged_mode_ready()?;
497
498        Self::with_slot(field, |target_slot| {
499            self.session
500                .execute_load_query_with(self.query(), move |load, plan| {
501                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
502                })
503        })
504    }
505
506    /// Execute and return projected id/value pairs for the bottom `k` rows by
507    /// `field` under deterministic ordering `(field asc, primary_key asc)`
508    /// over the effective response window.
509    ///
510    /// Ranking is applied before projection and does not preserve query
511    /// `order_by(...)` row order in the returned values. For `k = 1`, this
512    /// matches `min_by(field)` projected to one `(id, value)` pair.
513    pub fn bottom_k_by_with_ids(
514        &self,
515        field: impl AsRef<str>,
516        take_count: u32,
517    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
518    where
519        E: EntityValue,
520    {
521        self.ensure_non_paged_mode_ready()?;
522
523        Self::with_slot(field, |target_slot| {
524            self.session
525                .execute_load_query_with(self.query(), move |load, plan| {
526                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
527                })
528        })
529    }
530
531    /// Execute and return distinct projected field values for the effective
532    /// result window, preserving first-observed value order.
533    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
534    where
535        E: EntityValue,
536    {
537        self.ensure_non_paged_mode_ready()?;
538
539        Self::with_slot(field, |target_slot| {
540            self.session
541                .execute_load_query_with(self.query(), move |load, plan| {
542                    load.distinct_values_by_slot(plan, target_slot)
543                })
544        })
545    }
546
547    /// Execute and return projected field values paired with row ids for the
548    /// effective result window.
549    pub fn values_by_with_ids(
550        &self,
551        field: impl AsRef<str>,
552    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
553    where
554        E: EntityValue,
555    {
556        self.ensure_non_paged_mode_ready()?;
557
558        Self::with_slot(field, |target_slot| {
559            self.session
560                .execute_load_query_with(self.query(), move |load, plan| {
561                    load.values_by_with_ids_slot(plan, target_slot)
562                })
563        })
564    }
565
566    /// Execute and return the first projected field value in effective response
567    /// order, if any.
568    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
569    where
570        E: EntityValue,
571    {
572        self.ensure_non_paged_mode_ready()?;
573
574        Self::with_slot(field, |target_slot| {
575            self.session
576                .execute_load_query_with(self.query(), move |load, plan| {
577                    load.first_value_by_slot(plan, target_slot)
578                })
579        })
580    }
581
582    /// Execute and return the last projected field value in effective response
583    /// order, if any.
584    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
585    where
586        E: EntityValue,
587    {
588        self.ensure_non_paged_mode_ready()?;
589
590        Self::with_slot(field, |target_slot| {
591            self.session
592                .execute_load_query_with(self.query(), move |load, plan| {
593                    load.last_value_by_slot(plan, target_slot)
594                })
595        })
596    }
597
598    /// Execute and return the first matching identifier in response order, if any.
599    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
600    where
601        E: EntityValue,
602    {
603        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_first(plan))
604    }
605
606    /// Explain scalar `first()` routing without executing the terminal.
607    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
608    where
609        E: EntityValue,
610    {
611        self.explain_scalar_non_paged_terminal(first())
612    }
613
614    /// Execute and return the last matching identifier in response order, if any.
615    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
616    where
617        E: EntityValue,
618    {
619        self.execute_scalar_non_paged_terminal(|load, plan| load.aggregate_last(plan))
620    }
621
622    /// Explain scalar `last()` routing without executing the terminal.
623    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
624    where
625        E: EntityValue,
626    {
627        self.explain_scalar_non_paged_terminal(last())
628    }
629
630    /// Execute and require exactly one matching row.
631    pub fn require_one(&self) -> Result<(), QueryError>
632    where
633        E: EntityValue,
634    {
635        self.execute()?.require_one()?;
636        Ok(())
637    }
638
639    /// Execute and require at least one matching row.
640    pub fn require_some(&self) -> Result<(), QueryError>
641    where
642        E: EntityValue,
643    {
644        self.execute()?.require_some()?;
645        Ok(())
646    }
647}