Skip to main content

icydb_core/db/query/fluent/load/
terminals.rs

1use crate::{
2    db::{
3        query::fluent::load::FluentLoadQuery,
4        query::{
5            api::ResponseCardinalityExt,
6            builder::aggregate::{exists, first, last, max, min},
7            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
8            intent::QueryError,
9        },
10        response::EntityResponse,
11    },
12    traits::{EntityKind, EntityValue},
13    types::{Decimal, Id},
14    value::Value,
15};
16
17type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
18
19impl<E> FluentLoadQuery<'_, E>
20where
21    E: EntityKind,
22{
23    // ------------------------------------------------------------------
24    // Execution (single semantic boundary)
25    // ------------------------------------------------------------------
26
27    /// Execute this query using the session's policy settings.
28    pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
29    where
30        E: EntityValue,
31    {
32        self.ensure_non_paged_mode_ready()?;
33
34        self.session.execute_query(self.query())
35    }
36
37    // ------------------------------------------------------------------
38    // Execution terminals — semantic only
39    // ------------------------------------------------------------------
40
41    /// Execute and return whether the result set is empty.
42    pub fn is_empty(&self) -> Result<bool, QueryError>
43    where
44        E: EntityValue,
45    {
46        Ok(!self.exists()?)
47    }
48
49    /// Execute and return whether at least one matching row exists.
50    pub fn exists(&self) -> Result<bool, QueryError>
51    where
52        E: EntityValue,
53    {
54        self.ensure_non_paged_mode_ready()?;
55
56        self.session
57            .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
58    }
59
60    /// Explain scalar `exists()` routing without executing the terminal.
61    pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
62    where
63        E: EntityValue,
64    {
65        self.ensure_non_paged_mode_ready()?;
66
67        crate::db::DbSession::<E::Canister>::explain_load_query_terminal_with(
68            self.query(),
69            exists(),
70        )
71    }
72
73    /// Explain scalar load execution shape without executing the query.
74    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
75    where
76        E: EntityValue,
77    {
78        self.query().explain_execution()
79    }
80
81    /// Explain scalar load execution shape as deterministic text.
82    pub fn explain_execution_text(&self) -> Result<String, QueryError>
83    where
84        E: EntityValue,
85    {
86        self.query().explain_execution_text()
87    }
88
89    /// Explain scalar load execution shape as canonical JSON.
90    pub fn explain_execution_json(&self) -> Result<String, QueryError>
91    where
92        E: EntityValue,
93    {
94        self.query().explain_execution_json()
95    }
96
97    /// Explain scalar load execution shape as verbose text with diagnostics.
98    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
99    where
100        E: EntityValue,
101    {
102        self.query().explain_execution_verbose()
103    }
104
105    /// Execute and return the number of matching rows.
106    pub fn count(&self) -> Result<u32, QueryError>
107    where
108        E: EntityValue,
109    {
110        self.ensure_non_paged_mode_ready()?;
111
112        self.session
113            .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
114    }
115
116    /// Execute and return the total persisted payload bytes for the effective
117    /// result window.
118    pub fn bytes(&self) -> Result<u64, QueryError>
119    where
120        E: EntityValue,
121    {
122        self.ensure_non_paged_mode_ready()?;
123
124        self.session
125            .execute_load_query_with(self.query(), |load, plan| load.bytes(plan))
126    }
127
128    /// Execute and return the total serialized bytes for `field` over the
129    /// effective result window.
130    pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
131    where
132        E: EntityValue,
133    {
134        self.ensure_non_paged_mode_ready()?;
135
136        Self::with_slot(field, |target_slot| {
137            self.session
138                .execute_load_query_with(self.query(), move |load, plan| {
139                    load.bytes_by_slot(plan, target_slot)
140                })
141        })
142    }
143
144    /// Execute and return the smallest matching identifier, if any.
145    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
146    where
147        E: EntityValue,
148    {
149        self.ensure_non_paged_mode_ready()?;
150
151        self.session
152            .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
153    }
154
155    /// Explain scalar `min()` routing without executing the terminal.
156    pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
157    where
158        E: EntityValue,
159    {
160        self.ensure_non_paged_mode_ready()?;
161
162        crate::db::DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), min())
163    }
164
165    /// Execute and return the id of the row with the smallest value for `field`.
166    ///
167    /// Ties are deterministic: equal field values resolve by primary key ascending.
168    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
169    where
170        E: EntityValue,
171    {
172        self.ensure_non_paged_mode_ready()?;
173
174        Self::with_slot(field, |target_slot| {
175            self.session
176                .execute_load_query_with(self.query(), move |load, plan| {
177                    load.aggregate_min_by_slot(plan, target_slot)
178                })
179        })
180    }
181
182    /// Execute and return the largest matching identifier, if any.
183    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
184    where
185        E: EntityValue,
186    {
187        self.ensure_non_paged_mode_ready()?;
188
189        self.session
190            .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
191    }
192
193    /// Explain scalar `max()` routing without executing the terminal.
194    pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
195    where
196        E: EntityValue,
197    {
198        self.ensure_non_paged_mode_ready()?;
199
200        crate::db::DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), max())
201    }
202
203    /// Execute and return the id of the row with the largest value for `field`.
204    ///
205    /// Ties are deterministic: equal field values resolve by primary key ascending.
206    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
207    where
208        E: EntityValue,
209    {
210        self.ensure_non_paged_mode_ready()?;
211
212        Self::with_slot(field, |target_slot| {
213            self.session
214                .execute_load_query_with(self.query(), move |load, plan| {
215                    load.aggregate_max_by_slot(plan, target_slot)
216                })
217        })
218    }
219
220    /// Execute and return the id at zero-based ordinal `nth` when rows are
221    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
222    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
223    where
224        E: EntityValue,
225    {
226        self.ensure_non_paged_mode_ready()?;
227
228        Self::with_slot(field, |target_slot| {
229            self.session
230                .execute_load_query_with(self.query(), move |load, plan| {
231                    load.aggregate_nth_by_slot(plan, target_slot, nth)
232                })
233        })
234    }
235
236    /// Execute and return the sum of `field` over matching rows.
237    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
238    where
239        E: EntityValue,
240    {
241        self.ensure_non_paged_mode_ready()?;
242
243        Self::with_slot(field, |target_slot| {
244            self.session
245                .execute_load_query_with(self.query(), move |load, plan| {
246                    load.aggregate_sum_by_slot(plan, target_slot)
247                })
248        })
249    }
250
251    /// Execute and return the sum of distinct `field` values.
252    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
253    where
254        E: EntityValue,
255    {
256        self.ensure_non_paged_mode_ready()?;
257
258        Self::with_slot(field, |target_slot| {
259            self.session
260                .execute_load_query_with(self.query(), move |load, plan| {
261                    load.aggregate_sum_distinct_by_slot(plan, target_slot)
262                })
263        })
264    }
265
266    /// Execute and return the average of `field` over matching rows.
267    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
268    where
269        E: EntityValue,
270    {
271        self.ensure_non_paged_mode_ready()?;
272
273        Self::with_slot(field, |target_slot| {
274            self.session
275                .execute_load_query_with(self.query(), move |load, plan| {
276                    load.aggregate_avg_by_slot(plan, target_slot)
277                })
278        })
279    }
280
281    /// Execute and return the median id by `field` using deterministic ordering
282    /// `(field asc, primary key asc)`.
283    ///
284    /// Even-length windows select the lower median.
285    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
286    where
287        E: EntityValue,
288    {
289        self.ensure_non_paged_mode_ready()?;
290
291        Self::with_slot(field, |target_slot| {
292            self.session
293                .execute_load_query_with(self.query(), move |load, plan| {
294                    load.aggregate_median_by_slot(plan, target_slot)
295                })
296        })
297    }
298
299    /// Execute and return the number of distinct values for `field` over the
300    /// effective result window.
301    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
302    where
303        E: EntityValue,
304    {
305        self.ensure_non_paged_mode_ready()?;
306
307        Self::with_slot(field, |target_slot| {
308            self.session
309                .execute_load_query_with(self.query(), move |load, plan| {
310                    load.aggregate_count_distinct_by_slot(plan, target_slot)
311                })
312        })
313    }
314
315    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
316    ///
317    /// Tie handling is deterministic for both extrema: primary key ascending.
318    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
319    where
320        E: EntityValue,
321    {
322        self.ensure_non_paged_mode_ready()?;
323
324        Self::with_slot(field, |target_slot| {
325            self.session
326                .execute_load_query_with(self.query(), move |load, plan| {
327                    load.aggregate_min_max_by_slot(plan, target_slot)
328                })
329        })
330    }
331
332    /// Execute and return projected field values for the effective result window.
333    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
334    where
335        E: EntityValue,
336    {
337        self.ensure_non_paged_mode_ready()?;
338
339        Self::with_slot(field, |target_slot| {
340            self.session
341                .execute_load_query_with(self.query(), move |load, plan| {
342                    load.values_by_slot(plan, target_slot)
343                })
344        })
345    }
346
347    /// Execute and return the first `k` rows from the effective response window.
348    pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
349    where
350        E: EntityValue,
351    {
352        self.ensure_non_paged_mode_ready()?;
353
354        self.session
355            .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
356    }
357
358    /// Execute and return the top `k` rows by `field` under deterministic
359    /// ordering `(field desc, primary_key asc)` over the effective response
360    /// window.
361    ///
362    /// This terminal applies its own ordering and does not preserve query
363    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
364    /// matches `max_by(field)` selection semantics.
365    pub fn top_k_by(
366        &self,
367        field: impl AsRef<str>,
368        take_count: u32,
369    ) -> Result<EntityResponse<E>, QueryError>
370    where
371        E: EntityValue,
372    {
373        self.ensure_non_paged_mode_ready()?;
374
375        Self::with_slot(field, |target_slot| {
376            self.session
377                .execute_load_query_with(self.query(), move |load, plan| {
378                    load.top_k_by_slot(plan, target_slot, take_count)
379                })
380        })
381    }
382
383    /// Execute and return the bottom `k` rows by `field` under deterministic
384    /// ordering `(field asc, primary_key asc)` over the effective response
385    /// window.
386    ///
387    /// This terminal applies its own ordering and does not preserve query
388    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
389    /// matches `min_by(field)` selection semantics.
390    pub fn bottom_k_by(
391        &self,
392        field: impl AsRef<str>,
393        take_count: u32,
394    ) -> Result<EntityResponse<E>, QueryError>
395    where
396        E: EntityValue,
397    {
398        self.ensure_non_paged_mode_ready()?;
399
400        Self::with_slot(field, |target_slot| {
401            self.session
402                .execute_load_query_with(self.query(), move |load, plan| {
403                    load.bottom_k_by_slot(plan, target_slot, take_count)
404                })
405        })
406    }
407
408    /// Execute and return projected values for the top `k` rows by `field`
409    /// under deterministic ordering `(field desc, primary_key asc)` over the
410    /// effective response window.
411    ///
412    /// Ranking is applied before projection and does not preserve query
413    /// `order_by(...)` row order in the returned values. For `k = 1`, this
414    /// matches `max_by(field)` projected to one value.
415    pub fn top_k_by_values(
416        &self,
417        field: impl AsRef<str>,
418        take_count: u32,
419    ) -> Result<Vec<Value>, QueryError>
420    where
421        E: EntityValue,
422    {
423        self.ensure_non_paged_mode_ready()?;
424
425        Self::with_slot(field, |target_slot| {
426            self.session
427                .execute_load_query_with(self.query(), move |load, plan| {
428                    load.top_k_by_values_slot(plan, target_slot, take_count)
429                })
430        })
431    }
432
433    /// Execute and return projected values for the bottom `k` rows by `field`
434    /// under deterministic ordering `(field asc, primary_key asc)` over the
435    /// effective response window.
436    ///
437    /// Ranking is applied before projection and does not preserve query
438    /// `order_by(...)` row order in the returned values. For `k = 1`, this
439    /// matches `min_by(field)` projected to one value.
440    pub fn bottom_k_by_values(
441        &self,
442        field: impl AsRef<str>,
443        take_count: u32,
444    ) -> Result<Vec<Value>, QueryError>
445    where
446        E: EntityValue,
447    {
448        self.ensure_non_paged_mode_ready()?;
449
450        Self::with_slot(field, |target_slot| {
451            self.session
452                .execute_load_query_with(self.query(), move |load, plan| {
453                    load.bottom_k_by_values_slot(plan, target_slot, take_count)
454                })
455        })
456    }
457
458    /// Execute and return projected id/value pairs for the top `k` rows by
459    /// `field` under deterministic ordering `(field desc, primary_key asc)`
460    /// over the effective response window.
461    ///
462    /// Ranking is applied before projection and does not preserve query
463    /// `order_by(...)` row order in the returned values. For `k = 1`, this
464    /// matches `max_by(field)` projected to one `(id, value)` pair.
465    pub fn top_k_by_with_ids(
466        &self,
467        field: impl AsRef<str>,
468        take_count: u32,
469    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
470    where
471        E: EntityValue,
472    {
473        self.ensure_non_paged_mode_ready()?;
474
475        Self::with_slot(field, |target_slot| {
476            self.session
477                .execute_load_query_with(self.query(), move |load, plan| {
478                    load.top_k_by_with_ids_slot(plan, target_slot, take_count)
479                })
480        })
481    }
482
483    /// Execute and return projected id/value pairs for the bottom `k` rows by
484    /// `field` under deterministic ordering `(field asc, primary_key asc)`
485    /// over the effective response window.
486    ///
487    /// Ranking is applied before projection and does not preserve query
488    /// `order_by(...)` row order in the returned values. For `k = 1`, this
489    /// matches `min_by(field)` projected to one `(id, value)` pair.
490    pub fn bottom_k_by_with_ids(
491        &self,
492        field: impl AsRef<str>,
493        take_count: u32,
494    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
495    where
496        E: EntityValue,
497    {
498        self.ensure_non_paged_mode_ready()?;
499
500        Self::with_slot(field, |target_slot| {
501            self.session
502                .execute_load_query_with(self.query(), move |load, plan| {
503                    load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
504                })
505        })
506    }
507
508    /// Execute and return distinct projected field values for the effective
509    /// result window, preserving first-observed value order.
510    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
511    where
512        E: EntityValue,
513    {
514        self.ensure_non_paged_mode_ready()?;
515
516        Self::with_slot(field, |target_slot| {
517            self.session
518                .execute_load_query_with(self.query(), move |load, plan| {
519                    load.distinct_values_by_slot(plan, target_slot)
520                })
521        })
522    }
523
524    /// Execute and return projected field values paired with row ids for the
525    /// effective result window.
526    pub fn values_by_with_ids(
527        &self,
528        field: impl AsRef<str>,
529    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
530    where
531        E: EntityValue,
532    {
533        self.ensure_non_paged_mode_ready()?;
534
535        Self::with_slot(field, |target_slot| {
536            self.session
537                .execute_load_query_with(self.query(), move |load, plan| {
538                    load.values_by_with_ids_slot(plan, target_slot)
539                })
540        })
541    }
542
543    /// Execute and return the first projected field value in effective response
544    /// order, if any.
545    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
546    where
547        E: EntityValue,
548    {
549        self.ensure_non_paged_mode_ready()?;
550
551        Self::with_slot(field, |target_slot| {
552            self.session
553                .execute_load_query_with(self.query(), move |load, plan| {
554                    load.first_value_by_slot(plan, target_slot)
555                })
556        })
557    }
558
559    /// Execute and return the last projected field value in effective response
560    /// order, if any.
561    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
562    where
563        E: EntityValue,
564    {
565        self.ensure_non_paged_mode_ready()?;
566
567        Self::with_slot(field, |target_slot| {
568            self.session
569                .execute_load_query_with(self.query(), move |load, plan| {
570                    load.last_value_by_slot(plan, target_slot)
571                })
572        })
573    }
574
575    /// Execute and return the first matching identifier in response order, if any.
576    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
577    where
578        E: EntityValue,
579    {
580        self.ensure_non_paged_mode_ready()?;
581
582        self.session
583            .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
584    }
585
586    /// Explain scalar `first()` routing without executing the terminal.
587    pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
588    where
589        E: EntityValue,
590    {
591        self.ensure_non_paged_mode_ready()?;
592
593        crate::db::DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), first())
594    }
595
596    /// Execute and return the last matching identifier in response order, if any.
597    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
598    where
599        E: EntityValue,
600    {
601        self.ensure_non_paged_mode_ready()?;
602
603        self.session
604            .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
605    }
606
607    /// Explain scalar `last()` routing without executing the terminal.
608    pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
609    where
610        E: EntityValue,
611    {
612        self.ensure_non_paged_mode_ready()?;
613
614        crate::db::DbSession::<E::Canister>::explain_load_query_terminal_with(self.query(), last())
615    }
616
617    /// Execute and require exactly one matching row.
618    pub fn require_one(&self) -> Result<(), QueryError>
619    where
620        E: EntityValue,
621    {
622        self.execute()?.require_one()?;
623        Ok(())
624    }
625
626    /// Execute and require at least one matching row.
627    pub fn require_some(&self) -> Result<(), QueryError>
628    where
629        E: EntityValue,
630    {
631        self.execute()?.require_some()?;
632        Ok(())
633    }
634}