Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        predicate::Predicate,
5        query::{
6            explain::ExplainPlan,
7            expr::{FilterExpr, SortExpr},
8            intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
9            policy,
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// FluentLoadQuery
22///
23/// Session-bound load query wrapper.
24/// Owns intent construction and execution routing only.
25/// All result inspection and projection is performed on `Response<E>`.
26///
27
28pub struct FluentLoadQuery<'a, E>
29where
30    E: EntityKind,
31{
32    session: &'a DbSession<E::Canister>,
33    query: Query<E>,
34    cursor_token: Option<String>,
35}
36
37///
38/// PagedLoadQuery
39///
40/// Session-bound cursor pagination wrapper.
41/// This wrapper only exposes cursor continuation and paged execution.
42///
43
44pub struct PagedLoadQuery<'a, E>
45where
46    E: EntityKind,
47{
48    inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53    E: EntityKind,
54{
55    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56        Self {
57            session,
58            query,
59            cursor_token: None,
60        }
61    }
62
63    // ------------------------------------------------------------------
64    // Intent inspection
65    // ------------------------------------------------------------------
66
67    #[must_use]
68    pub const fn query(&self) -> &Query<E> {
69        &self.query
70    }
71
72    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73        self.query = map(self.query);
74        self
75    }
76
77    fn try_map_query(
78        mut self,
79        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80    ) -> Result<Self, QueryError> {
81        self.query = map(self.query)?;
82        Ok(self)
83    }
84
85    // ------------------------------------------------------------------
86    // Intent builders (pure)
87    // ------------------------------------------------------------------
88
89    /// Set the access path to a single typed primary-key value.
90    ///
91    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
92    #[must_use]
93    pub fn by_id(mut self, id: Id<E>) -> Self {
94        self.query = self.query.by_id(id.key());
95        self
96    }
97
98    /// Set the access path to multiple typed primary-key values.
99    ///
100    /// IDs are public and may come from untrusted input sources.
101    #[must_use]
102    pub fn by_ids<I>(mut self, ids: I) -> Self
103    where
104        I: IntoIterator<Item = Id<E>>,
105    {
106        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107        self
108    }
109
110    // ------------------------------------------------------------------
111    // Query Refinement
112    // ------------------------------------------------------------------
113
114    #[must_use]
115    pub fn filter(self, predicate: Predicate) -> Self {
116        self.map_query(|query| query.filter(predicate))
117    }
118
119    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120        self.try_map_query(|query| query.filter_expr(expr))
121    }
122
123    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124        self.try_map_query(|query| query.sort_expr(expr))
125    }
126
127    #[must_use]
128    pub fn order_by(self, field: impl AsRef<str>) -> Self {
129        self.map_query(|query| query.order_by(field))
130    }
131
132    #[must_use]
133    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134        self.map_query(|query| query.order_by_desc(field))
135    }
136
137    /// Bound the number of returned rows.
138    ///
139    /// Pagination is only valid with explicit ordering; combine `limit` and/or
140    /// `offset` with `order_by(...)` or planning fails.
141    #[must_use]
142    pub fn limit(self, limit: u32) -> Self {
143        self.map_query(|query| query.limit(limit))
144    }
145
146    /// Skip a number of rows in the ordered result stream.
147    ///
148    /// Pagination is only valid with explicit ordering; combine `offset` and/or
149    /// `limit` with `order_by(...)` or planning fails.
150    #[must_use]
151    pub fn offset(self, offset: u32) -> Self {
152        self.map_query(|query| query.offset(offset))
153    }
154
155    /// Attach an opaque cursor token for continuation pagination.
156    ///
157    /// Cursor-mode invariants are checked before planning/execution:
158    /// - explicit `order_by(...)` is required
159    /// - explicit `limit(...)` is required
160    #[must_use]
161    pub fn cursor(mut self, token: impl Into<String>) -> Self {
162        self.cursor_token = Some(token.into());
163        self
164    }
165
166    // ------------------------------------------------------------------
167    // Planning / diagnostics
168    // ------------------------------------------------------------------
169
170    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171        self.query.explain()
172    }
173
174    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
175        if let Some(err) = self.cursor_intent_error() {
176            return Err(QueryError::Intent(err));
177        }
178
179        self.query.planned()
180    }
181
182    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
183        if let Some(err) = self.cursor_intent_error() {
184            return Err(QueryError::Intent(err));
185        }
186
187        self.query.plan()
188    }
189
190    // ------------------------------------------------------------------
191    // Execution (single semantic boundary)
192    // ------------------------------------------------------------------
193
194    /// Execute this query using the session's policy settings.
195    pub fn execute(&self) -> Result<Response<E>, QueryError>
196    where
197        E: EntityValue,
198    {
199        self.ensure_non_paged_mode_ready()?;
200
201        self.session.execute_query(self.query())
202    }
203
204    /// Enter typed cursor-pagination mode for this query.
205    ///
206    /// Cursor pagination requires:
207    /// - explicit `order_by(...)`
208    /// - explicit `limit(...)`
209    ///
210    /// Requests are deterministic under canonical ordering, but continuation is
211    /// best-effort and forward-only over live state.
212    /// No snapshot/version is pinned across requests, so concurrent writes may
213    /// shift page boundaries.
214    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
215        self.ensure_paged_mode_ready()?;
216
217        Ok(PagedLoadQuery { inner: self })
218    }
219
220    /// Execute this query as cursor pagination and return items + next cursor.
221    ///
222    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
223    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
224    where
225        E: EntityValue,
226    {
227        self.page()?.execute()
228    }
229
230    // ------------------------------------------------------------------
231    // Execution terminals — semantic only
232    // ------------------------------------------------------------------
233
234    /// Execute and return whether the result set is empty.
235    pub fn is_empty(&self) -> Result<bool, QueryError>
236    where
237        E: EntityValue,
238    {
239        Ok(!self.exists()?)
240    }
241
242    /// Execute and return whether at least one matching row exists.
243    pub fn exists(&self) -> Result<bool, QueryError>
244    where
245        E: EntityValue,
246    {
247        self.ensure_non_paged_mode_ready()?;
248
249        self.session
250            .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
251    }
252
253    /// Execute and return the number of matching rows.
254    pub fn count(&self) -> Result<u32, QueryError>
255    where
256        E: EntityValue,
257    {
258        self.ensure_non_paged_mode_ready()?;
259
260        self.session
261            .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
262    }
263
264    /// Execute and return the smallest matching identifier, if any.
265    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
266    where
267        E: EntityValue,
268    {
269        self.ensure_non_paged_mode_ready()?;
270
271        self.session
272            .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
273    }
274
275    /// Execute and return the id of the row with the smallest value for `field`.
276    ///
277    /// Ties are deterministic: equal field values resolve by primary key ascending.
278    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
279    where
280        E: EntityValue,
281    {
282        self.ensure_non_paged_mode_ready()?;
283
284        self.session
285            .execute_load_query_with(self.query(), |load, plan| {
286                load.aggregate_min_by(plan, field.as_ref())
287            })
288    }
289
290    /// Execute and return the largest matching identifier, if any.
291    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
292    where
293        E: EntityValue,
294    {
295        self.ensure_non_paged_mode_ready()?;
296
297        self.session
298            .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
299    }
300
301    /// Execute and return the id of the row with the largest value for `field`.
302    ///
303    /// Ties are deterministic: equal field values resolve by primary key ascending.
304    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
305    where
306        E: EntityValue,
307    {
308        self.ensure_non_paged_mode_ready()?;
309
310        self.session
311            .execute_load_query_with(self.query(), |load, plan| {
312                load.aggregate_max_by(plan, field.as_ref())
313            })
314    }
315
316    /// Execute and return the id at zero-based ordinal `nth` when rows are
317    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
318    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
319    where
320        E: EntityValue,
321    {
322        self.ensure_non_paged_mode_ready()?;
323
324        self.session
325            .execute_load_query_with(self.query(), |load, plan| {
326                load.aggregate_nth_by(plan, field.as_ref(), nth)
327            })
328    }
329
330    /// Execute and return the sum of `field` over matching rows.
331    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
332    where
333        E: EntityValue,
334    {
335        self.ensure_non_paged_mode_ready()?;
336
337        self.session
338            .execute_load_query_with(self.query(), |load, plan| {
339                load.aggregate_sum_by(plan, field.as_ref())
340            })
341    }
342
343    /// Execute and return the average of `field` over matching rows.
344    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
345    where
346        E: EntityValue,
347    {
348        self.ensure_non_paged_mode_ready()?;
349
350        self.session
351            .execute_load_query_with(self.query(), |load, plan| {
352                load.aggregate_avg_by(plan, field.as_ref())
353            })
354    }
355
356    /// Execute and return the median id by `field` using deterministic ordering
357    /// `(field asc, primary key asc)`.
358    ///
359    /// Even-length windows select the lower median.
360    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
361    where
362        E: EntityValue,
363    {
364        self.ensure_non_paged_mode_ready()?;
365
366        self.session
367            .execute_load_query_with(self.query(), |load, plan| {
368                load.aggregate_median_by(plan, field.as_ref())
369            })
370    }
371
372    /// Execute and return the number of distinct values for `field` over the
373    /// effective result window.
374    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.ensure_non_paged_mode_ready()?;
379
380        self.session
381            .execute_load_query_with(self.query(), |load, plan| {
382                load.aggregate_count_distinct_by(plan, field.as_ref())
383            })
384    }
385
386    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
387    ///
388    /// Tie handling is deterministic for both extrema: primary key ascending.
389    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
390    where
391        E: EntityValue,
392    {
393        self.ensure_non_paged_mode_ready()?;
394
395        self.session
396            .execute_load_query_with(self.query(), |load, plan| {
397                load.aggregate_min_max_by(plan, field.as_ref())
398            })
399    }
400
401    /// Execute and return projected field values for the effective result window.
402    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
403    where
404        E: EntityValue,
405    {
406        self.ensure_non_paged_mode_ready()?;
407
408        self.session
409            .execute_load_query_with(self.query(), |load, plan| {
410                load.values_by(plan, field.as_ref())
411            })
412    }
413
414    /// Execute and return the first `k` rows from the effective response window.
415    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
416    where
417        E: EntityValue,
418    {
419        self.ensure_non_paged_mode_ready()?;
420
421        self.session
422            .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
423    }
424
425    /// Execute and return the top `k` rows by `field` under deterministic
426    /// ordering `(field desc, primary_key asc)` over the effective response
427    /// window.
428    ///
429    /// This terminal applies its own ordering and does not preserve query
430    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
431    /// matches `max_by(field)` selection semantics.
432    pub fn top_k_by(
433        &self,
434        field: impl AsRef<str>,
435        take_count: u32,
436    ) -> Result<Response<E>, QueryError>
437    where
438        E: EntityValue,
439    {
440        self.ensure_non_paged_mode_ready()?;
441
442        self.session
443            .execute_load_query_with(self.query(), |load, plan| {
444                load.top_k_by(plan, field.as_ref(), take_count)
445            })
446    }
447
448    /// Execute and return the bottom `k` rows by `field` under deterministic
449    /// ordering `(field asc, primary_key asc)` over the effective response
450    /// window.
451    ///
452    /// This terminal applies its own ordering and does not preserve query
453    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
454    /// matches `min_by(field)` selection semantics.
455    pub fn bottom_k_by(
456        &self,
457        field: impl AsRef<str>,
458        take_count: u32,
459    ) -> Result<Response<E>, QueryError>
460    where
461        E: EntityValue,
462    {
463        self.ensure_non_paged_mode_ready()?;
464
465        self.session
466            .execute_load_query_with(self.query(), |load, plan| {
467                load.bottom_k_by(plan, field.as_ref(), take_count)
468            })
469    }
470
471    /// Execute and return projected values for the top `k` rows by `field`
472    /// under deterministic ordering `(field desc, primary_key asc)` over the
473    /// effective response window.
474    ///
475    /// Ranking is applied before projection and does not preserve query
476    /// `order_by(...)` row order in the returned values. For `k = 1`, this
477    /// matches `max_by(field)` projected to one value.
478    pub fn top_k_by_values(
479        &self,
480        field: impl AsRef<str>,
481        take_count: u32,
482    ) -> Result<Vec<Value>, QueryError>
483    where
484        E: EntityValue,
485    {
486        self.ensure_non_paged_mode_ready()?;
487
488        self.session
489            .execute_load_query_with(self.query(), |load, plan| {
490                load.top_k_by_values(plan, field.as_ref(), take_count)
491            })
492    }
493
494    /// Execute and return projected values for the bottom `k` rows by `field`
495    /// under deterministic ordering `(field asc, primary_key asc)` over the
496    /// effective response window.
497    ///
498    /// Ranking is applied before projection and does not preserve query
499    /// `order_by(...)` row order in the returned values. For `k = 1`, this
500    /// matches `min_by(field)` projected to one value.
501    pub fn bottom_k_by_values(
502        &self,
503        field: impl AsRef<str>,
504        take_count: u32,
505    ) -> Result<Vec<Value>, QueryError>
506    where
507        E: EntityValue,
508    {
509        self.ensure_non_paged_mode_ready()?;
510
511        self.session
512            .execute_load_query_with(self.query(), |load, plan| {
513                load.bottom_k_by_values(plan, field.as_ref(), take_count)
514            })
515    }
516
517    /// Execute and return projected id/value pairs for the top `k` rows by
518    /// `field` under deterministic ordering `(field desc, primary_key asc)`
519    /// over the effective response window.
520    ///
521    /// Ranking is applied before projection and does not preserve query
522    /// `order_by(...)` row order in the returned values. For `k = 1`, this
523    /// matches `max_by(field)` projected to one `(id, value)` pair.
524    pub fn top_k_by_with_ids(
525        &self,
526        field: impl AsRef<str>,
527        take_count: u32,
528    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
529    where
530        E: EntityValue,
531    {
532        self.ensure_non_paged_mode_ready()?;
533
534        self.session
535            .execute_load_query_with(self.query(), |load, plan| {
536                load.top_k_by_with_ids(plan, field.as_ref(), take_count)
537            })
538    }
539
540    /// Execute and return projected id/value pairs for the bottom `k` rows by
541    /// `field` under deterministic ordering `(field asc, primary_key asc)`
542    /// over the effective response window.
543    ///
544    /// Ranking is applied before projection and does not preserve query
545    /// `order_by(...)` row order in the returned values. For `k = 1`, this
546    /// matches `min_by(field)` projected to one `(id, value)` pair.
547    pub fn bottom_k_by_with_ids(
548        &self,
549        field: impl AsRef<str>,
550        take_count: u32,
551    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
552    where
553        E: EntityValue,
554    {
555        self.ensure_non_paged_mode_ready()?;
556
557        self.session
558            .execute_load_query_with(self.query(), |load, plan| {
559                load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
560            })
561    }
562
563    /// Execute and return distinct projected field values for the effective
564    /// result window, preserving first-observed value order.
565    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
566    where
567        E: EntityValue,
568    {
569        self.ensure_non_paged_mode_ready()?;
570
571        self.session
572            .execute_load_query_with(self.query(), |load, plan| {
573                load.distinct_values_by(plan, field.as_ref())
574            })
575    }
576
577    /// Execute and return projected field values paired with row ids for the
578    /// effective result window.
579    pub fn values_by_with_ids(
580        &self,
581        field: impl AsRef<str>,
582    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
583    where
584        E: EntityValue,
585    {
586        self.ensure_non_paged_mode_ready()?;
587
588        self.session
589            .execute_load_query_with(self.query(), |load, plan| {
590                load.values_by_with_ids(plan, field.as_ref())
591            })
592    }
593
594    /// Execute and return the first projected field value in effective response
595    /// order, if any.
596    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
597    where
598        E: EntityValue,
599    {
600        self.ensure_non_paged_mode_ready()?;
601
602        self.session
603            .execute_load_query_with(self.query(), |load, plan| {
604                load.first_value_by(plan, field.as_ref())
605            })
606    }
607
608    /// Execute and return the last projected field value in effective response
609    /// order, if any.
610    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
611    where
612        E: EntityValue,
613    {
614        self.ensure_non_paged_mode_ready()?;
615
616        self.session
617            .execute_load_query_with(self.query(), |load, plan| {
618                load.last_value_by(plan, field.as_ref())
619            })
620    }
621
622    /// Execute and return the first matching identifier in response order, if any.
623    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
624    where
625        E: EntityValue,
626    {
627        self.ensure_non_paged_mode_ready()?;
628
629        self.session
630            .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
631    }
632
633    /// Execute and return the last matching identifier in response order, if any.
634    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
635    where
636        E: EntityValue,
637    {
638        self.ensure_non_paged_mode_ready()?;
639
640        self.session
641            .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
642    }
643
644    /// Execute and require exactly one matching row.
645    pub fn require_one(&self) -> Result<(), QueryError>
646    where
647        E: EntityValue,
648    {
649        self.execute()?.require_one()?;
650        Ok(())
651    }
652
653    /// Execute and require at least one matching row.
654    pub fn require_some(&self) -> Result<(), QueryError>
655    where
656        E: EntityValue,
657    {
658        self.execute()?.require_some()?;
659        Ok(())
660    }
661}
662
663impl<E> FluentLoadQuery<'_, E>
664where
665    E: EntityKind,
666{
667    fn non_paged_intent_error(&self) -> Option<IntentError> {
668        self.cursor_token
669            .as_ref()
670            .map(|_| IntentError::CursorRequiresPagedExecution)
671    }
672
673    fn cursor_intent_error(&self) -> Option<IntentError> {
674        self.cursor_token
675            .as_ref()
676            .and_then(|_| self.paged_intent_error())
677    }
678
679    fn paged_intent_error(&self) -> Option<IntentError> {
680        let spec = self.query.load_spec()?;
681
682        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
683            .err()
684            .map(IntentError::from)
685    }
686
687    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
688        if let Some(err) = self.paged_intent_error() {
689            return Err(QueryError::Intent(err));
690        }
691
692        Ok(())
693    }
694
695    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
696        if let Some(err) = self.non_paged_intent_error() {
697            return Err(QueryError::Intent(err));
698        }
699
700        Ok(())
701    }
702}
703
704impl<E> FluentLoadQuery<'_, E>
705where
706    E: EntityKind + SingletonEntity,
707    E::Key: Default,
708{
709    #[must_use]
710    pub fn only(self) -> Self {
711        self.map_query(Query::only)
712    }
713}
714
715impl<E> PagedLoadQuery<'_, E>
716where
717    E: EntityKind,
718{
719    // ------------------------------------------------------------------
720    // Intent inspection
721    // ------------------------------------------------------------------
722
723    #[must_use]
724    pub const fn query(&self) -> &Query<E> {
725        self.inner.query()
726    }
727
728    // ------------------------------------------------------------------
729    // Cursor continuation
730    // ------------------------------------------------------------------
731
732    /// Attach an opaque continuation token for the next page.
733    #[must_use]
734    pub fn cursor(mut self, token: impl Into<String>) -> Self {
735        self.inner = self.inner.cursor(token);
736        self
737    }
738
739    // ------------------------------------------------------------------
740    // Execution
741    // ------------------------------------------------------------------
742
743    /// Execute in cursor-pagination mode and return items + next cursor.
744    ///
745    /// Continuation is best-effort and forward-only over live state:
746    /// deterministic per request under canonical ordering, with no
747    /// snapshot/version pinned across requests.
748    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
749    where
750        E: EntityValue,
751    {
752        self.execute_with_trace()
753            .map(PagedLoadExecutionWithTrace::into_execution)
754    }
755
756    /// Execute in cursor-pagination mode and return items, next cursor,
757    /// and optional execution trace details when session debug mode is enabled.
758    ///
759    /// Trace collection is opt-in via `DbSession::debug()` and does not
760    /// change query planning or result semantics.
761    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
762    where
763        E: EntityValue,
764    {
765        self.inner.ensure_paged_mode_ready()?;
766
767        self.inner.session.execute_load_query_paged_with_trace(
768            self.inner.query(),
769            self.inner.cursor_token.as_deref(),
770        )
771    }
772}