Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        executor::ExecutablePlan,
5        policy,
6        query::{
7            explain::ExplainPlan,
8            expr::{FilterExpr, SortExpr},
9            intent::{IntentError, PlannedQuery, Query, QueryError},
10            predicate::Predicate,
11        },
12        response::Response,
13    },
14    traits::{EntityKind, EntityValue, SingletonEntity},
15    types::{Decimal, Id},
16    value::Value,
17};
18
19type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
20
21///
22/// FluentLoadQuery
23///
24/// Session-bound load query wrapper.
25/// Owns intent construction and execution routing only.
26/// All result inspection and projection is performed on `Response<E>`.
27///
28
29pub struct FluentLoadQuery<'a, E>
30where
31    E: EntityKind,
32{
33    session: &'a DbSession<E::Canister>,
34    query: Query<E>,
35    cursor_token: Option<String>,
36}
37
38///
39/// PagedLoadQuery
40///
41/// Session-bound cursor pagination wrapper.
42/// This wrapper only exposes cursor continuation and paged execution.
43///
44
45pub struct PagedLoadQuery<'a, E>
46where
47    E: EntityKind,
48{
49    inner: FluentLoadQuery<'a, E>,
50}
51
52impl<'a, E> FluentLoadQuery<'a, E>
53where
54    E: EntityKind,
55{
56    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
57        Self {
58            session,
59            query,
60            cursor_token: None,
61        }
62    }
63
64    // ------------------------------------------------------------------
65    // Intent inspection
66    // ------------------------------------------------------------------
67
68    #[must_use]
69    pub const fn query(&self) -> &Query<E> {
70        &self.query
71    }
72
73    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
74        self.query = map(self.query);
75        self
76    }
77
78    fn try_map_query(
79        mut self,
80        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
81    ) -> Result<Self, QueryError> {
82        self.query = map(self.query)?;
83        Ok(self)
84    }
85
86    // ------------------------------------------------------------------
87    // Intent builders (pure)
88    // ------------------------------------------------------------------
89
90    /// Set the access path to a single typed primary-key value.
91    ///
92    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
93    #[must_use]
94    pub fn by_id(mut self, id: Id<E>) -> Self {
95        self.query = self.query.by_id(id.key());
96        self
97    }
98
99    /// Set the access path to multiple typed primary-key values.
100    ///
101    /// IDs are public and may come from untrusted input sources.
102    #[must_use]
103    pub fn by_ids<I>(mut self, ids: I) -> Self
104    where
105        I: IntoIterator<Item = Id<E>>,
106    {
107        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
108        self
109    }
110
111    // ------------------------------------------------------------------
112    // Query Refinement
113    // ------------------------------------------------------------------
114
115    #[must_use]
116    pub fn filter(self, predicate: Predicate) -> Self {
117        self.map_query(|query| query.filter(predicate))
118    }
119
120    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
121        self.try_map_query(|query| query.filter_expr(expr))
122    }
123
124    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
125        self.try_map_query(|query| query.sort_expr(expr))
126    }
127
128    #[must_use]
129    pub fn order_by(self, field: impl AsRef<str>) -> Self {
130        self.map_query(|query| query.order_by(field))
131    }
132
133    #[must_use]
134    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
135        self.map_query(|query| query.order_by_desc(field))
136    }
137
138    /// Bound the number of returned rows.
139    ///
140    /// Pagination is only valid with explicit ordering; combine `limit` and/or
141    /// `offset` with `order_by(...)` or planning fails.
142    #[must_use]
143    pub fn limit(self, limit: u32) -> Self {
144        self.map_query(|query| query.limit(limit))
145    }
146
147    /// Skip a number of rows in the ordered result stream.
148    ///
149    /// Pagination is only valid with explicit ordering; combine `offset` and/or
150    /// `limit` with `order_by(...)` or planning fails.
151    #[must_use]
152    pub fn offset(self, offset: u32) -> Self {
153        self.map_query(|query| query.offset(offset))
154    }
155
156    /// Attach an opaque cursor token for continuation pagination.
157    ///
158    /// Cursor-mode invariants are checked before planning/execution:
159    /// - explicit `order_by(...)` is required
160    /// - explicit `limit(...)` is required
161    #[must_use]
162    pub fn cursor(mut self, token: impl Into<String>) -> Self {
163        self.cursor_token = Some(token.into());
164        self
165    }
166
167    // ------------------------------------------------------------------
168    // Planning / diagnostics
169    // ------------------------------------------------------------------
170
171    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
172        self.query.explain()
173    }
174
175    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
176        if let Some(err) = self.cursor_intent_error() {
177            return Err(QueryError::Intent(err));
178        }
179
180        self.query.planned()
181    }
182
183    /// Compile this fluent load intent into executor runtime state.
184    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
185        self.planned().map(ExecutablePlan::from)
186    }
187
188    // ------------------------------------------------------------------
189    // Execution (single semantic boundary)
190    // ------------------------------------------------------------------
191
192    /// Execute this query using the session's policy settings.
193    pub fn execute(&self) -> Result<Response<E>, QueryError>
194    where
195        E: EntityValue,
196    {
197        self.ensure_non_paged_mode_ready()?;
198
199        self.session.execute_query(self.query())
200    }
201
202    /// Enter typed cursor-pagination mode for this query.
203    ///
204    /// Cursor pagination requires:
205    /// - explicit `order_by(...)`
206    /// - explicit `limit(...)`
207    ///
208    /// Requests are deterministic under canonical ordering, but continuation is
209    /// best-effort and forward-only over live state.
210    /// No snapshot/version is pinned across requests, so concurrent writes may
211    /// shift page boundaries.
212    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
213        self.ensure_paged_mode_ready()?;
214
215        Ok(PagedLoadQuery { inner: self })
216    }
217
218    /// Execute this query as cursor pagination and return items + next cursor.
219    ///
220    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
221    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
222    where
223        E: EntityValue,
224    {
225        self.page()?.execute()
226    }
227
228    // ------------------------------------------------------------------
229    // Execution terminals — semantic only
230    // ------------------------------------------------------------------
231
232    /// Execute and return whether the result set is empty.
233    pub fn is_empty(&self) -> Result<bool, QueryError>
234    where
235        E: EntityValue,
236    {
237        Ok(!self.exists()?)
238    }
239
240    /// Execute and return whether at least one matching row exists.
241    pub fn exists(&self) -> Result<bool, QueryError>
242    where
243        E: EntityValue,
244    {
245        self.ensure_non_paged_mode_ready()?;
246
247        self.session.execute_load_query_exists(self.query())
248    }
249
250    /// Execute and return the number of matching rows.
251    pub fn count(&self) -> Result<u32, QueryError>
252    where
253        E: EntityValue,
254    {
255        self.ensure_non_paged_mode_ready()?;
256
257        self.session.execute_load_query_count(self.query())
258    }
259
260    /// Execute and return the smallest matching identifier, if any.
261    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
262    where
263        E: EntityValue,
264    {
265        self.ensure_non_paged_mode_ready()?;
266
267        self.session.execute_load_query_min(self.query())
268    }
269
270    /// Execute and return the id of the row with the smallest value for `field`.
271    ///
272    /// Ties are deterministic: equal field values resolve by primary key ascending.
273    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
274    where
275        E: EntityValue,
276    {
277        self.ensure_non_paged_mode_ready()?;
278
279        self.session
280            .execute_load_query_min_by(self.query(), field.as_ref())
281    }
282
283    /// Execute and return the largest matching identifier, if any.
284    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
285    where
286        E: EntityValue,
287    {
288        self.ensure_non_paged_mode_ready()?;
289
290        self.session.execute_load_query_max(self.query())
291    }
292
293    /// Execute and return the id of the row with the largest value for `field`.
294    ///
295    /// Ties are deterministic: equal field values resolve by primary key ascending.
296    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
297    where
298        E: EntityValue,
299    {
300        self.ensure_non_paged_mode_ready()?;
301
302        self.session
303            .execute_load_query_max_by(self.query(), field.as_ref())
304    }
305
306    /// Execute and return the id at zero-based ordinal `nth` when rows are
307    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
308    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
309    where
310        E: EntityValue,
311    {
312        self.ensure_non_paged_mode_ready()?;
313
314        self.session
315            .execute_load_query_nth_by(self.query(), field.as_ref(), nth)
316    }
317
318    /// Execute and return the sum of `field` over matching rows.
319    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
320    where
321        E: EntityValue,
322    {
323        self.ensure_non_paged_mode_ready()?;
324
325        self.session
326            .execute_load_query_sum_by(self.query(), field.as_ref())
327    }
328
329    /// Execute and return the average of `field` over matching rows.
330    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
331    where
332        E: EntityValue,
333    {
334        self.ensure_non_paged_mode_ready()?;
335
336        self.session
337            .execute_load_query_avg_by(self.query(), field.as_ref())
338    }
339
340    /// Execute and return the median id by `field` using deterministic ordering
341    /// `(field asc, primary key asc)`.
342    ///
343    /// Even-length windows select the lower median.
344    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
345    where
346        E: EntityValue,
347    {
348        self.ensure_non_paged_mode_ready()?;
349
350        self.session
351            .execute_load_query_median_by(self.query(), field.as_ref())
352    }
353
354    /// Execute and return the number of distinct values for `field` over the
355    /// effective result window.
356    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
357    where
358        E: EntityValue,
359    {
360        self.ensure_non_paged_mode_ready()?;
361
362        self.session
363            .execute_load_query_count_distinct_by(self.query(), field.as_ref())
364    }
365
366    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
367    ///
368    /// Tie handling is deterministic for both extrema: primary key ascending.
369    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
370    where
371        E: EntityValue,
372    {
373        self.ensure_non_paged_mode_ready()?;
374
375        self.session
376            .execute_load_query_min_max_by(self.query(), field.as_ref())
377    }
378
379    /// Execute and return projected field values for the effective result window.
380    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
381    where
382        E: EntityValue,
383    {
384        self.ensure_non_paged_mode_ready()?;
385
386        self.session
387            .execute_load_query_values_by(self.query(), field.as_ref())
388    }
389
390    /// Execute and return the first `k` rows from the effective response window.
391    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
392    where
393        E: EntityValue,
394    {
395        self.ensure_non_paged_mode_ready()?;
396
397        self.session
398            .execute_load_query_take(self.query(), take_count)
399    }
400
401    /// Execute and return the top `k` rows by `field` under deterministic
402    /// ordering `(field desc, primary_key asc)` over the effective response
403    /// window.
404    ///
405    /// This terminal applies its own ordering and does not preserve query
406    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
407    /// matches `max_by(field)` selection semantics.
408    pub fn top_k_by(
409        &self,
410        field: impl AsRef<str>,
411        take_count: u32,
412    ) -> Result<Response<E>, QueryError>
413    where
414        E: EntityValue,
415    {
416        self.ensure_non_paged_mode_ready()?;
417
418        self.session
419            .execute_load_query_top_k_by(self.query(), field.as_ref(), take_count)
420    }
421
422    /// Execute and return the bottom `k` rows by `field` under deterministic
423    /// ordering `(field asc, primary_key asc)` over the effective response
424    /// window.
425    ///
426    /// This terminal applies its own ordering and does not preserve query
427    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
428    /// matches `min_by(field)` selection semantics.
429    pub fn bottom_k_by(
430        &self,
431        field: impl AsRef<str>,
432        take_count: u32,
433    ) -> Result<Response<E>, QueryError>
434    where
435        E: EntityValue,
436    {
437        self.ensure_non_paged_mode_ready()?;
438
439        self.session
440            .execute_load_query_bottom_k_by(self.query(), field.as_ref(), take_count)
441    }
442
443    /// Execute and return projected values for the top `k` rows by `field`
444    /// under deterministic ordering `(field desc, primary_key asc)` over the
445    /// effective response window.
446    ///
447    /// Ranking is applied before projection and does not preserve query
448    /// `order_by(...)` row order in the returned values. For `k = 1`, this
449    /// matches `max_by(field)` projected to one value.
450    pub fn top_k_by_values(
451        &self,
452        field: impl AsRef<str>,
453        take_count: u32,
454    ) -> Result<Vec<Value>, QueryError>
455    where
456        E: EntityValue,
457    {
458        self.ensure_non_paged_mode_ready()?;
459
460        self.session
461            .execute_load_query_top_k_by_values(self.query(), field.as_ref(), take_count)
462    }
463
464    /// Execute and return projected values for the bottom `k` rows by `field`
465    /// under deterministic ordering `(field asc, primary_key asc)` over the
466    /// effective response window.
467    ///
468    /// Ranking is applied before projection and does not preserve query
469    /// `order_by(...)` row order in the returned values. For `k = 1`, this
470    /// matches `min_by(field)` projected to one value.
471    pub fn bottom_k_by_values(
472        &self,
473        field: impl AsRef<str>,
474        take_count: u32,
475    ) -> Result<Vec<Value>, QueryError>
476    where
477        E: EntityValue,
478    {
479        self.ensure_non_paged_mode_ready()?;
480
481        self.session
482            .execute_load_query_bottom_k_by_values(self.query(), field.as_ref(), take_count)
483    }
484
485    /// Execute and return projected id/value pairs for the top `k` rows by
486    /// `field` under deterministic ordering `(field desc, primary_key asc)`
487    /// over the effective response window.
488    ///
489    /// Ranking is applied before projection and does not preserve query
490    /// `order_by(...)` row order in the returned values. For `k = 1`, this
491    /// matches `max_by(field)` projected to one `(id, value)` pair.
492    pub fn top_k_by_with_ids(
493        &self,
494        field: impl AsRef<str>,
495        take_count: u32,
496    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
497    where
498        E: EntityValue,
499    {
500        self.ensure_non_paged_mode_ready()?;
501
502        self.session
503            .execute_load_query_top_k_by_with_ids(self.query(), field.as_ref(), take_count)
504    }
505
506    /// Execute and return projected id/value pairs for the bottom `k` rows by
507    /// `field` under deterministic ordering `(field asc, primary_key asc)`
508    /// over the effective response window.
509    ///
510    /// Ranking is applied before projection and does not preserve query
511    /// `order_by(...)` row order in the returned values. For `k = 1`, this
512    /// matches `min_by(field)` projected to one `(id, value)` pair.
513    pub fn bottom_k_by_with_ids(
514        &self,
515        field: impl AsRef<str>,
516        take_count: u32,
517    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
518    where
519        E: EntityValue,
520    {
521        self.ensure_non_paged_mode_ready()?;
522
523        self.session.execute_load_query_bottom_k_by_with_ids(
524            self.query(),
525            field.as_ref(),
526            take_count,
527        )
528    }
529
530    /// Execute and return distinct projected field values for the effective
531    /// result window, preserving first-observed value order.
532    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
533    where
534        E: EntityValue,
535    {
536        self.ensure_non_paged_mode_ready()?;
537
538        self.session
539            .execute_load_query_distinct_values_by(self.query(), field.as_ref())
540    }
541
542    /// Execute and return projected field values paired with row ids for the
543    /// effective result window.
544    pub fn values_by_with_ids(
545        &self,
546        field: impl AsRef<str>,
547    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
548    where
549        E: EntityValue,
550    {
551        self.ensure_non_paged_mode_ready()?;
552
553        self.session
554            .execute_load_query_values_by_with_ids(self.query(), field.as_ref())
555    }
556
557    /// Execute and return the first projected field value in effective response
558    /// order, if any.
559    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
560    where
561        E: EntityValue,
562    {
563        self.ensure_non_paged_mode_ready()?;
564
565        self.session
566            .execute_load_query_first_value_by(self.query(), field.as_ref())
567    }
568
569    /// Execute and return the last projected field value in effective response
570    /// order, if any.
571    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
572    where
573        E: EntityValue,
574    {
575        self.ensure_non_paged_mode_ready()?;
576
577        self.session
578            .execute_load_query_last_value_by(self.query(), field.as_ref())
579    }
580
581    /// Execute and return the first matching identifier in response order, if any.
582    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
583    where
584        E: EntityValue,
585    {
586        self.ensure_non_paged_mode_ready()?;
587
588        self.session.execute_load_query_first(self.query())
589    }
590
591    /// Execute and return the last matching identifier in response order, if any.
592    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
593    where
594        E: EntityValue,
595    {
596        self.ensure_non_paged_mode_ready()?;
597
598        self.session.execute_load_query_last(self.query())
599    }
600
601    /// Execute and require exactly one matching row.
602    pub fn require_one(&self) -> Result<(), QueryError>
603    where
604        E: EntityValue,
605    {
606        self.execute()?.require_one()?;
607        Ok(())
608    }
609
610    /// Execute and require at least one matching row.
611    pub fn require_some(&self) -> Result<(), QueryError>
612    where
613        E: EntityValue,
614    {
615        self.execute()?.require_some()?;
616        Ok(())
617    }
618}
619
620impl<E> FluentLoadQuery<'_, E>
621where
622    E: EntityKind,
623{
624    fn non_paged_intent_error(&self) -> Option<IntentError> {
625        self.cursor_token
626            .as_ref()
627            .map(|_| IntentError::CursorRequiresPagedExecution)
628    }
629
630    fn cursor_intent_error(&self) -> Option<IntentError> {
631        self.cursor_token
632            .as_ref()
633            .and_then(|_| self.paged_intent_error())
634    }
635
636    fn paged_intent_error(&self) -> Option<IntentError> {
637        let spec = self.query.load_spec()?;
638
639        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
640            .err()
641            .map(IntentError::from)
642    }
643
644    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
645        if let Some(err) = self.paged_intent_error() {
646            return Err(QueryError::Intent(err));
647        }
648
649        Ok(())
650    }
651
652    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
653        if let Some(err) = self.non_paged_intent_error() {
654            return Err(QueryError::Intent(err));
655        }
656
657        Ok(())
658    }
659}
660
661impl<E> FluentLoadQuery<'_, E>
662where
663    E: EntityKind + SingletonEntity,
664    E::Key: Default,
665{
666    #[must_use]
667    pub fn only(self) -> Self {
668        self.map_query(Query::only)
669    }
670}
671
672impl<E> PagedLoadQuery<'_, E>
673where
674    E: EntityKind,
675{
676    // ------------------------------------------------------------------
677    // Intent inspection
678    // ------------------------------------------------------------------
679
680    #[must_use]
681    pub const fn query(&self) -> &Query<E> {
682        self.inner.query()
683    }
684
685    // ------------------------------------------------------------------
686    // Cursor continuation
687    // ------------------------------------------------------------------
688
689    /// Attach an opaque continuation token for the next page.
690    #[must_use]
691    pub fn cursor(mut self, token: impl Into<String>) -> Self {
692        self.inner = self.inner.cursor(token);
693        self
694    }
695
696    // ------------------------------------------------------------------
697    // Execution
698    // ------------------------------------------------------------------
699
700    /// Execute in cursor-pagination mode and return items + next cursor.
701    ///
702    /// Continuation is best-effort and forward-only over live state:
703    /// deterministic per request under canonical ordering, with no
704    /// snapshot/version pinned across requests.
705    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
706    where
707        E: EntityValue,
708    {
709        self.execute_with_trace()
710            .map(PagedLoadExecutionWithTrace::into_execution)
711    }
712
713    /// Execute in cursor-pagination mode and return items, next cursor,
714    /// and optional execution trace details when session debug mode is enabled.
715    ///
716    /// Trace collection is opt-in via `DbSession::debug()` and does not
717    /// change query planning or result semantics.
718    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
719    where
720        E: EntityValue,
721    {
722        self.inner.ensure_paged_mode_ready()?;
723
724        self.inner.session.execute_load_query_paged_with_trace(
725            self.inner.query(),
726            self.inner.cursor_token.as_deref(),
727        )
728    }
729}