Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        contracts::Predicate,
5        policy,
6        query::{
7            explain::ExplainPlan,
8            expr::{FilterExpr, SortExpr},
9            intent::{IntentError, PlannedQuery, Query, QueryError},
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// FluentLoadQuery
22///
23/// Session-bound load query wrapper.
24/// Owns intent construction and execution routing only.
25/// All result inspection and projection is performed on `Response<E>`.
26///
27
28pub struct FluentLoadQuery<'a, E>
29where
30    E: EntityKind,
31{
32    session: &'a DbSession<E::Canister>,
33    query: Query<E>,
34    cursor_token: Option<String>,
35}
36
37///
38/// PagedLoadQuery
39///
40/// Session-bound cursor pagination wrapper.
41/// This wrapper only exposes cursor continuation and paged execution.
42///
43
44pub struct PagedLoadQuery<'a, E>
45where
46    E: EntityKind,
47{
48    inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53    E: EntityKind,
54{
55    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56        Self {
57            session,
58            query,
59            cursor_token: None,
60        }
61    }
62
63    // ------------------------------------------------------------------
64    // Intent inspection
65    // ------------------------------------------------------------------
66
67    #[must_use]
68    pub const fn query(&self) -> &Query<E> {
69        &self.query
70    }
71
72    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73        self.query = map(self.query);
74        self
75    }
76
77    fn try_map_query(
78        mut self,
79        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80    ) -> Result<Self, QueryError> {
81        self.query = map(self.query)?;
82        Ok(self)
83    }
84
85    // ------------------------------------------------------------------
86    // Intent builders (pure)
87    // ------------------------------------------------------------------
88
89    /// Set the access path to a single typed primary-key value.
90    ///
91    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
92    #[must_use]
93    pub fn by_id(mut self, id: Id<E>) -> Self {
94        self.query = self.query.by_id(id.key());
95        self
96    }
97
98    /// Set the access path to multiple typed primary-key values.
99    ///
100    /// IDs are public and may come from untrusted input sources.
101    #[must_use]
102    pub fn by_ids<I>(mut self, ids: I) -> Self
103    where
104        I: IntoIterator<Item = Id<E>>,
105    {
106        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107        self
108    }
109
110    // ------------------------------------------------------------------
111    // Query Refinement
112    // ------------------------------------------------------------------
113
114    #[must_use]
115    pub fn filter(self, predicate: Predicate) -> Self {
116        self.map_query(|query| query.filter(predicate))
117    }
118
119    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120        self.try_map_query(|query| query.filter_expr(expr))
121    }
122
123    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124        self.try_map_query(|query| query.sort_expr(expr))
125    }
126
127    #[must_use]
128    pub fn order_by(self, field: impl AsRef<str>) -> Self {
129        self.map_query(|query| query.order_by(field))
130    }
131
132    #[must_use]
133    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134        self.map_query(|query| query.order_by_desc(field))
135    }
136
137    /// Bound the number of returned rows.
138    ///
139    /// Pagination is only valid with explicit ordering; combine `limit` and/or
140    /// `offset` with `order_by(...)` or planning fails.
141    #[must_use]
142    pub fn limit(self, limit: u32) -> Self {
143        self.map_query(|query| query.limit(limit))
144    }
145
146    /// Skip a number of rows in the ordered result stream.
147    ///
148    /// Pagination is only valid with explicit ordering; combine `offset` and/or
149    /// `limit` with `order_by(...)` or planning fails.
150    #[must_use]
151    pub fn offset(self, offset: u32) -> Self {
152        self.map_query(|query| query.offset(offset))
153    }
154
155    /// Attach an opaque cursor token for continuation pagination.
156    ///
157    /// Cursor-mode invariants are checked before planning/execution:
158    /// - explicit `order_by(...)` is required
159    /// - explicit `limit(...)` is required
160    #[must_use]
161    pub fn cursor(mut self, token: impl Into<String>) -> Self {
162        self.cursor_token = Some(token.into());
163        self
164    }
165
166    // ------------------------------------------------------------------
167    // Planning / diagnostics
168    // ------------------------------------------------------------------
169
170    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171        self.query.explain()
172    }
173
174    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
175        if let Some(err) = self.cursor_intent_error() {
176            return Err(QueryError::Intent(err));
177        }
178
179        self.query.planned()
180    }
181
182    // ------------------------------------------------------------------
183    // Execution (single semantic boundary)
184    // ------------------------------------------------------------------
185
186    /// Execute this query using the session's policy settings.
187    pub fn execute(&self) -> Result<Response<E>, QueryError>
188    where
189        E: EntityValue,
190    {
191        self.ensure_non_paged_mode_ready()?;
192
193        self.session.execute_query(self.query())
194    }
195
196    /// Enter typed cursor-pagination mode for this query.
197    ///
198    /// Cursor pagination requires:
199    /// - explicit `order_by(...)`
200    /// - explicit `limit(...)`
201    ///
202    /// Requests are deterministic under canonical ordering, but continuation is
203    /// best-effort and forward-only over live state.
204    /// No snapshot/version is pinned across requests, so concurrent writes may
205    /// shift page boundaries.
206    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
207        self.ensure_paged_mode_ready()?;
208
209        Ok(PagedLoadQuery { inner: self })
210    }
211
212    /// Execute this query as cursor pagination and return items + next cursor.
213    ///
214    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
215    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
216    where
217        E: EntityValue,
218    {
219        self.page()?.execute()
220    }
221
222    // ------------------------------------------------------------------
223    // Execution terminals — semantic only
224    // ------------------------------------------------------------------
225
226    /// Execute and return whether the result set is empty.
227    pub fn is_empty(&self) -> Result<bool, QueryError>
228    where
229        E: EntityValue,
230    {
231        Ok(!self.exists()?)
232    }
233
234    /// Execute and return whether at least one matching row exists.
235    pub fn exists(&self) -> Result<bool, QueryError>
236    where
237        E: EntityValue,
238    {
239        self.ensure_non_paged_mode_ready()?;
240
241        self.session
242            .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
243    }
244
245    /// Execute and return the number of matching rows.
246    pub fn count(&self) -> Result<u32, QueryError>
247    where
248        E: EntityValue,
249    {
250        self.ensure_non_paged_mode_ready()?;
251
252        self.session
253            .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
254    }
255
256    /// Execute and return the smallest matching identifier, if any.
257    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
258    where
259        E: EntityValue,
260    {
261        self.ensure_non_paged_mode_ready()?;
262
263        self.session
264            .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
265    }
266
267    /// Execute and return the id of the row with the smallest value for `field`.
268    ///
269    /// Ties are deterministic: equal field values resolve by primary key ascending.
270    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
271    where
272        E: EntityValue,
273    {
274        self.ensure_non_paged_mode_ready()?;
275
276        self.session
277            .execute_load_query_with(self.query(), |load, plan| {
278                load.aggregate_min_by(plan, field.as_ref())
279            })
280    }
281
282    /// Execute and return the largest matching identifier, if any.
283    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
284    where
285        E: EntityValue,
286    {
287        self.ensure_non_paged_mode_ready()?;
288
289        self.session
290            .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
291    }
292
293    /// Execute and return the id of the row with the largest value for `field`.
294    ///
295    /// Ties are deterministic: equal field values resolve by primary key ascending.
296    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
297    where
298        E: EntityValue,
299    {
300        self.ensure_non_paged_mode_ready()?;
301
302        self.session
303            .execute_load_query_with(self.query(), |load, plan| {
304                load.aggregate_max_by(plan, field.as_ref())
305            })
306    }
307
308    /// Execute and return the id at zero-based ordinal `nth` when rows are
309    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
310    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
311    where
312        E: EntityValue,
313    {
314        self.ensure_non_paged_mode_ready()?;
315
316        self.session
317            .execute_load_query_with(self.query(), |load, plan| {
318                load.aggregate_nth_by(plan, field.as_ref(), nth)
319            })
320    }
321
322    /// Execute and return the sum of `field` over matching rows.
323    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
324    where
325        E: EntityValue,
326    {
327        self.ensure_non_paged_mode_ready()?;
328
329        self.session
330            .execute_load_query_with(self.query(), |load, plan| {
331                load.aggregate_sum_by(plan, field.as_ref())
332            })
333    }
334
335    /// Execute and return the average of `field` over matching rows.
336    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
337    where
338        E: EntityValue,
339    {
340        self.ensure_non_paged_mode_ready()?;
341
342        self.session
343            .execute_load_query_with(self.query(), |load, plan| {
344                load.aggregate_avg_by(plan, field.as_ref())
345            })
346    }
347
348    /// Execute and return the median id by `field` using deterministic ordering
349    /// `(field asc, primary key asc)`.
350    ///
351    /// Even-length windows select the lower median.
352    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
353    where
354        E: EntityValue,
355    {
356        self.ensure_non_paged_mode_ready()?;
357
358        self.session
359            .execute_load_query_with(self.query(), |load, plan| {
360                load.aggregate_median_by(plan, field.as_ref())
361            })
362    }
363
364    /// Execute and return the number of distinct values for `field` over the
365    /// effective result window.
366    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
367    where
368        E: EntityValue,
369    {
370        self.ensure_non_paged_mode_ready()?;
371
372        self.session
373            .execute_load_query_with(self.query(), |load, plan| {
374                load.aggregate_count_distinct_by(plan, field.as_ref())
375            })
376    }
377
378    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
379    ///
380    /// Tie handling is deterministic for both extrema: primary key ascending.
381    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
382    where
383        E: EntityValue,
384    {
385        self.ensure_non_paged_mode_ready()?;
386
387        self.session
388            .execute_load_query_with(self.query(), |load, plan| {
389                load.aggregate_min_max_by(plan, field.as_ref())
390            })
391    }
392
393    /// Execute and return projected field values for the effective result window.
394    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
395    where
396        E: EntityValue,
397    {
398        self.ensure_non_paged_mode_ready()?;
399
400        self.session
401            .execute_load_query_with(self.query(), |load, plan| {
402                load.values_by(plan, field.as_ref())
403            })
404    }
405
406    /// Execute and return the first `k` rows from the effective response window.
407    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
408    where
409        E: EntityValue,
410    {
411        self.ensure_non_paged_mode_ready()?;
412
413        self.session
414            .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
415    }
416
417    /// Execute and return the top `k` rows by `field` under deterministic
418    /// ordering `(field desc, primary_key asc)` over the effective response
419    /// window.
420    ///
421    /// This terminal applies its own ordering and does not preserve query
422    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
423    /// matches `max_by(field)` selection semantics.
424    pub fn top_k_by(
425        &self,
426        field: impl AsRef<str>,
427        take_count: u32,
428    ) -> Result<Response<E>, QueryError>
429    where
430        E: EntityValue,
431    {
432        self.ensure_non_paged_mode_ready()?;
433
434        self.session
435            .execute_load_query_with(self.query(), |load, plan| {
436                load.top_k_by(plan, field.as_ref(), take_count)
437            })
438    }
439
440    /// Execute and return the bottom `k` rows by `field` under deterministic
441    /// ordering `(field asc, primary_key asc)` over the effective response
442    /// window.
443    ///
444    /// This terminal applies its own ordering and does not preserve query
445    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
446    /// matches `min_by(field)` selection semantics.
447    pub fn bottom_k_by(
448        &self,
449        field: impl AsRef<str>,
450        take_count: u32,
451    ) -> Result<Response<E>, QueryError>
452    where
453        E: EntityValue,
454    {
455        self.ensure_non_paged_mode_ready()?;
456
457        self.session
458            .execute_load_query_with(self.query(), |load, plan| {
459                load.bottom_k_by(plan, field.as_ref(), take_count)
460            })
461    }
462
463    /// Execute and return projected values for the top `k` rows by `field`
464    /// under deterministic ordering `(field desc, primary_key asc)` over the
465    /// effective response window.
466    ///
467    /// Ranking is applied before projection and does not preserve query
468    /// `order_by(...)` row order in the returned values. For `k = 1`, this
469    /// matches `max_by(field)` projected to one value.
470    pub fn top_k_by_values(
471        &self,
472        field: impl AsRef<str>,
473        take_count: u32,
474    ) -> Result<Vec<Value>, QueryError>
475    where
476        E: EntityValue,
477    {
478        self.ensure_non_paged_mode_ready()?;
479
480        self.session
481            .execute_load_query_with(self.query(), |load, plan| {
482                load.top_k_by_values(plan, field.as_ref(), take_count)
483            })
484    }
485
486    /// Execute and return projected values for the bottom `k` rows by `field`
487    /// under deterministic ordering `(field asc, primary_key asc)` over the
488    /// effective response window.
489    ///
490    /// Ranking is applied before projection and does not preserve query
491    /// `order_by(...)` row order in the returned values. For `k = 1`, this
492    /// matches `min_by(field)` projected to one value.
493    pub fn bottom_k_by_values(
494        &self,
495        field: impl AsRef<str>,
496        take_count: u32,
497    ) -> Result<Vec<Value>, QueryError>
498    where
499        E: EntityValue,
500    {
501        self.ensure_non_paged_mode_ready()?;
502
503        self.session
504            .execute_load_query_with(self.query(), |load, plan| {
505                load.bottom_k_by_values(plan, field.as_ref(), take_count)
506            })
507    }
508
509    /// Execute and return projected id/value pairs for the top `k` rows by
510    /// `field` under deterministic ordering `(field desc, primary_key asc)`
511    /// over the effective response window.
512    ///
513    /// Ranking is applied before projection and does not preserve query
514    /// `order_by(...)` row order in the returned values. For `k = 1`, this
515    /// matches `max_by(field)` projected to one `(id, value)` pair.
516    pub fn top_k_by_with_ids(
517        &self,
518        field: impl AsRef<str>,
519        take_count: u32,
520    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
521    where
522        E: EntityValue,
523    {
524        self.ensure_non_paged_mode_ready()?;
525
526        self.session
527            .execute_load_query_with(self.query(), |load, plan| {
528                load.top_k_by_with_ids(plan, field.as_ref(), take_count)
529            })
530    }
531
532    /// Execute and return projected id/value pairs for the bottom `k` rows by
533    /// `field` under deterministic ordering `(field asc, primary_key asc)`
534    /// over the effective response window.
535    ///
536    /// Ranking is applied before projection and does not preserve query
537    /// `order_by(...)` row order in the returned values. For `k = 1`, this
538    /// matches `min_by(field)` projected to one `(id, value)` pair.
539    pub fn bottom_k_by_with_ids(
540        &self,
541        field: impl AsRef<str>,
542        take_count: u32,
543    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
544    where
545        E: EntityValue,
546    {
547        self.ensure_non_paged_mode_ready()?;
548
549        self.session
550            .execute_load_query_with(self.query(), |load, plan| {
551                load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
552            })
553    }
554
555    /// Execute and return distinct projected field values for the effective
556    /// result window, preserving first-observed value order.
557    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
558    where
559        E: EntityValue,
560    {
561        self.ensure_non_paged_mode_ready()?;
562
563        self.session
564            .execute_load_query_with(self.query(), |load, plan| {
565                load.distinct_values_by(plan, field.as_ref())
566            })
567    }
568
569    /// Execute and return projected field values paired with row ids for the
570    /// effective result window.
571    pub fn values_by_with_ids(
572        &self,
573        field: impl AsRef<str>,
574    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
575    where
576        E: EntityValue,
577    {
578        self.ensure_non_paged_mode_ready()?;
579
580        self.session
581            .execute_load_query_with(self.query(), |load, plan| {
582                load.values_by_with_ids(plan, field.as_ref())
583            })
584    }
585
586    /// Execute and return the first projected field value in effective response
587    /// order, if any.
588    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
589    where
590        E: EntityValue,
591    {
592        self.ensure_non_paged_mode_ready()?;
593
594        self.session
595            .execute_load_query_with(self.query(), |load, plan| {
596                load.first_value_by(plan, field.as_ref())
597            })
598    }
599
600    /// Execute and return the last projected field value in effective response
601    /// order, if any.
602    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
603    where
604        E: EntityValue,
605    {
606        self.ensure_non_paged_mode_ready()?;
607
608        self.session
609            .execute_load_query_with(self.query(), |load, plan| {
610                load.last_value_by(plan, field.as_ref())
611            })
612    }
613
614    /// Execute and return the first matching identifier in response order, if any.
615    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
616    where
617        E: EntityValue,
618    {
619        self.ensure_non_paged_mode_ready()?;
620
621        self.session
622            .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
623    }
624
625    /// Execute and return the last matching identifier in response order, if any.
626    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
627    where
628        E: EntityValue,
629    {
630        self.ensure_non_paged_mode_ready()?;
631
632        self.session
633            .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
634    }
635
636    /// Execute and require exactly one matching row.
637    pub fn require_one(&self) -> Result<(), QueryError>
638    where
639        E: EntityValue,
640    {
641        self.execute()?.require_one()?;
642        Ok(())
643    }
644
645    /// Execute and require at least one matching row.
646    pub fn require_some(&self) -> Result<(), QueryError>
647    where
648        E: EntityValue,
649    {
650        self.execute()?.require_some()?;
651        Ok(())
652    }
653}
654
655impl<E> FluentLoadQuery<'_, E>
656where
657    E: EntityKind,
658{
659    fn non_paged_intent_error(&self) -> Option<IntentError> {
660        self.cursor_token
661            .as_ref()
662            .map(|_| IntentError::CursorRequiresPagedExecution)
663    }
664
665    fn cursor_intent_error(&self) -> Option<IntentError> {
666        self.cursor_token
667            .as_ref()
668            .and_then(|_| self.paged_intent_error())
669    }
670
671    fn paged_intent_error(&self) -> Option<IntentError> {
672        let spec = self.query.load_spec()?;
673
674        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
675            .err()
676            .map(IntentError::from)
677    }
678
679    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
680        if let Some(err) = self.paged_intent_error() {
681            return Err(QueryError::Intent(err));
682        }
683
684        Ok(())
685    }
686
687    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
688        if let Some(err) = self.non_paged_intent_error() {
689            return Err(QueryError::Intent(err));
690        }
691
692        Ok(())
693    }
694}
695
696impl<E> FluentLoadQuery<'_, E>
697where
698    E: EntityKind + SingletonEntity,
699    E::Key: Default,
700{
701    #[must_use]
702    pub fn only(self) -> Self {
703        self.map_query(Query::only)
704    }
705}
706
707impl<E> PagedLoadQuery<'_, E>
708where
709    E: EntityKind,
710{
711    // ------------------------------------------------------------------
712    // Intent inspection
713    // ------------------------------------------------------------------
714
715    #[must_use]
716    pub const fn query(&self) -> &Query<E> {
717        self.inner.query()
718    }
719
720    // ------------------------------------------------------------------
721    // Cursor continuation
722    // ------------------------------------------------------------------
723
724    /// Attach an opaque continuation token for the next page.
725    #[must_use]
726    pub fn cursor(mut self, token: impl Into<String>) -> Self {
727        self.inner = self.inner.cursor(token);
728        self
729    }
730
731    // ------------------------------------------------------------------
732    // Execution
733    // ------------------------------------------------------------------
734
735    /// Execute in cursor-pagination mode and return items + next cursor.
736    ///
737    /// Continuation is best-effort and forward-only over live state:
738    /// deterministic per request under canonical ordering, with no
739    /// snapshot/version pinned across requests.
740    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
741    where
742        E: EntityValue,
743    {
744        self.execute_with_trace()
745            .map(PagedLoadExecutionWithTrace::into_execution)
746    }
747
748    /// Execute in cursor-pagination mode and return items, next cursor,
749    /// and optional execution trace details when session debug mode is enabled.
750    ///
751    /// Trace collection is opt-in via `DbSession::debug()` and does not
752    /// change query planning or result semantics.
753    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
754    where
755        E: EntityValue,
756    {
757        self.inner.ensure_paged_mode_ready()?;
758
759        self.inner.session.execute_load_query_paged_with_trace(
760            self.inner.query(),
761            self.inner.cursor_token.as_deref(),
762        )
763    }
764}