Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        executor::ExecutablePlan,
5        query::{
6            explain::ExplainPlan,
7            expr::{FilterExpr, SortExpr},
8            intent::{IntentError, Query, QueryError},
9            policy,
10            predicate::Predicate,
11        },
12        response::Response,
13    },
14    traits::{EntityKind, EntityValue, SingletonEntity},
15    types::{Decimal, Id},
16    value::Value,
17};
18
19type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
20
21///
22/// FluentLoadQuery
23///
24/// Session-bound load query wrapper.
25/// Owns intent construction and execution routing only.
26/// All result inspection and projection is performed on `Response<E>`.
27///
28
29pub struct FluentLoadQuery<'a, E>
30where
31    E: EntityKind,
32{
33    session: &'a DbSession<E::Canister>,
34    query: Query<E>,
35    cursor_token: Option<String>,
36}
37
38///
39/// PagedLoadQuery
40///
41/// Session-bound cursor pagination wrapper.
42/// This wrapper only exposes cursor continuation and paged execution.
43///
44
45pub struct PagedLoadQuery<'a, E>
46where
47    E: EntityKind,
48{
49    inner: FluentLoadQuery<'a, E>,
50}
51
52impl<'a, E> FluentLoadQuery<'a, E>
53where
54    E: EntityKind,
55{
56    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
57        Self {
58            session,
59            query,
60            cursor_token: None,
61        }
62    }
63
64    // ------------------------------------------------------------------
65    // Intent inspection
66    // ------------------------------------------------------------------
67
68    #[must_use]
69    pub const fn query(&self) -> &Query<E> {
70        &self.query
71    }
72
73    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
74        self.query = map(self.query);
75        self
76    }
77
78    fn try_map_query(
79        mut self,
80        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
81    ) -> Result<Self, QueryError> {
82        self.query = map(self.query)?;
83        Ok(self)
84    }
85
86    // ------------------------------------------------------------------
87    // Intent builders (pure)
88    // ------------------------------------------------------------------
89
90    /// Set the access path to a single typed primary-key value.
91    ///
92    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
93    #[must_use]
94    pub fn by_id(mut self, id: Id<E>) -> Self {
95        self.query = self.query.by_id(id.key());
96        self
97    }
98
99    /// Set the access path to multiple typed primary-key values.
100    ///
101    /// IDs are public and may come from untrusted input sources.
102    #[must_use]
103    pub fn by_ids<I>(mut self, ids: I) -> Self
104    where
105        I: IntoIterator<Item = Id<E>>,
106    {
107        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
108        self
109    }
110
111    // ------------------------------------------------------------------
112    // Query Refinement
113    // ------------------------------------------------------------------
114
115    #[must_use]
116    pub fn filter(self, predicate: Predicate) -> Self {
117        self.map_query(|query| query.filter(predicate))
118    }
119
120    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
121        self.try_map_query(|query| query.filter_expr(expr))
122    }
123
124    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
125        self.try_map_query(|query| query.sort_expr(expr))
126    }
127
128    #[must_use]
129    pub fn order_by(self, field: impl AsRef<str>) -> Self {
130        self.map_query(|query| query.order_by(field))
131    }
132
133    #[must_use]
134    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
135        self.map_query(|query| query.order_by_desc(field))
136    }
137
138    /// Bound the number of returned rows.
139    ///
140    /// Pagination is only valid with explicit ordering; combine `limit` and/or
141    /// `offset` with `order_by(...)` or planning fails.
142    #[must_use]
143    pub fn limit(self, limit: u32) -> Self {
144        self.map_query(|query| query.limit(limit))
145    }
146
147    /// Skip a number of rows in the ordered result stream.
148    ///
149    /// Pagination is only valid with explicit ordering; combine `offset` and/or
150    /// `limit` with `order_by(...)` or planning fails.
151    #[must_use]
152    pub fn offset(self, offset: u32) -> Self {
153        self.map_query(|query| query.offset(offset))
154    }
155
156    /// Attach an opaque cursor token for continuation pagination.
157    ///
158    /// Cursor-mode invariants are checked before planning/execution:
159    /// - explicit `order_by(...)` is required
160    /// - explicit `limit(...)` is required
161    #[must_use]
162    pub fn cursor(mut self, token: impl Into<String>) -> Self {
163        self.cursor_token = Some(token.into());
164        self
165    }
166
167    // ------------------------------------------------------------------
168    // Planning / diagnostics
169    // ------------------------------------------------------------------
170
171    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
172        self.query.explain()
173    }
174
175    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
176        if let Some(err) = self.cursor_intent_error() {
177            return Err(QueryError::Intent(err));
178        }
179
180        self.query.plan()
181    }
182
183    // ------------------------------------------------------------------
184    // Execution (single semantic boundary)
185    // ------------------------------------------------------------------
186
187    /// Execute this query using the session's policy settings.
188    pub fn execute(&self) -> Result<Response<E>, QueryError>
189    where
190        E: EntityValue,
191    {
192        self.ensure_non_paged_mode_ready()?;
193
194        self.session.execute_query(self.query())
195    }
196
197    /// Enter typed cursor-pagination mode for this query.
198    ///
199    /// Cursor pagination requires:
200    /// - explicit `order_by(...)`
201    /// - explicit `limit(...)`
202    ///
203    /// Requests are deterministic under canonical ordering, but continuation is
204    /// best-effort and forward-only over live state.
205    /// No snapshot/version is pinned across requests, so concurrent writes may
206    /// shift page boundaries.
207    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
208        self.ensure_paged_mode_ready()?;
209
210        Ok(PagedLoadQuery { inner: self })
211    }
212
213    /// Execute this query as cursor pagination and return items + next cursor.
214    ///
215    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
216    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
217    where
218        E: EntityValue,
219    {
220        self.page()?.execute()
221    }
222
223    // ------------------------------------------------------------------
224    // Execution terminals — semantic only
225    // ------------------------------------------------------------------
226
227    /// Execute and return whether the result set is empty.
228    pub fn is_empty(&self) -> Result<bool, QueryError>
229    where
230        E: EntityValue,
231    {
232        Ok(!self.exists()?)
233    }
234
235    /// Execute and return whether at least one matching row exists.
236    pub fn exists(&self) -> Result<bool, QueryError>
237    where
238        E: EntityValue,
239    {
240        self.ensure_non_paged_mode_ready()?;
241
242        self.session.execute_load_query_exists(self.query())
243    }
244
245    /// Execute and return the number of matching rows.
246    pub fn count(&self) -> Result<u32, QueryError>
247    where
248        E: EntityValue,
249    {
250        self.ensure_non_paged_mode_ready()?;
251
252        self.session.execute_load_query_count(self.query())
253    }
254
255    /// Execute and return the smallest matching identifier, if any.
256    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
257    where
258        E: EntityValue,
259    {
260        self.ensure_non_paged_mode_ready()?;
261
262        self.session.execute_load_query_min(self.query())
263    }
264
265    /// Execute and return the id of the row with the smallest value for `field`.
266    ///
267    /// Ties are deterministic: equal field values resolve by primary key ascending.
268    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
269    where
270        E: EntityValue,
271    {
272        self.ensure_non_paged_mode_ready()?;
273
274        self.session
275            .execute_load_query_min_by(self.query(), field.as_ref())
276    }
277
278    /// Execute and return the largest matching identifier, if any.
279    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
280    where
281        E: EntityValue,
282    {
283        self.ensure_non_paged_mode_ready()?;
284
285        self.session.execute_load_query_max(self.query())
286    }
287
288    /// Execute and return the id of the row with the largest value for `field`.
289    ///
290    /// Ties are deterministic: equal field values resolve by primary key ascending.
291    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
292    where
293        E: EntityValue,
294    {
295        self.ensure_non_paged_mode_ready()?;
296
297        self.session
298            .execute_load_query_max_by(self.query(), field.as_ref())
299    }
300
301    /// Execute and return the id at zero-based ordinal `nth` when rows are
302    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
303    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
304    where
305        E: EntityValue,
306    {
307        self.ensure_non_paged_mode_ready()?;
308
309        self.session
310            .execute_load_query_nth_by(self.query(), field.as_ref(), nth)
311    }
312
313    /// Execute and return the sum of `field` over matching rows.
314    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
315    where
316        E: EntityValue,
317    {
318        self.ensure_non_paged_mode_ready()?;
319
320        self.session
321            .execute_load_query_sum_by(self.query(), field.as_ref())
322    }
323
324    /// Execute and return the average of `field` over matching rows.
325    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
326    where
327        E: EntityValue,
328    {
329        self.ensure_non_paged_mode_ready()?;
330
331        self.session
332            .execute_load_query_avg_by(self.query(), field.as_ref())
333    }
334
335    /// Execute and return the median id by `field` using deterministic ordering
336    /// `(field asc, primary key asc)`.
337    ///
338    /// Even-length windows select the lower median.
339    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
340    where
341        E: EntityValue,
342    {
343        self.ensure_non_paged_mode_ready()?;
344
345        self.session
346            .execute_load_query_median_by(self.query(), field.as_ref())
347    }
348
349    /// Execute and return the number of distinct values for `field` over the
350    /// effective result window.
351    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
352    where
353        E: EntityValue,
354    {
355        self.ensure_non_paged_mode_ready()?;
356
357        self.session
358            .execute_load_query_count_distinct_by(self.query(), field.as_ref())
359    }
360
361    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
362    ///
363    /// Tie handling is deterministic for both extrema: primary key ascending.
364    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
365    where
366        E: EntityValue,
367    {
368        self.ensure_non_paged_mode_ready()?;
369
370        self.session
371            .execute_load_query_min_max_by(self.query(), field.as_ref())
372    }
373
374    /// Execute and return projected field values for the effective result window.
375    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
376    where
377        E: EntityValue,
378    {
379        self.ensure_non_paged_mode_ready()?;
380
381        self.session
382            .execute_load_query_values_by(self.query(), field.as_ref())
383    }
384
385    /// Execute and return the first `k` rows from the effective response window.
386    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
387    where
388        E: EntityValue,
389    {
390        self.ensure_non_paged_mode_ready()?;
391
392        self.session
393            .execute_load_query_take(self.query(), take_count)
394    }
395
396    /// Execute and return the top `k` rows by `field` under deterministic
397    /// ordering `(field desc, primary_key asc)` over the effective response
398    /// window.
399    ///
400    /// This terminal applies its own ordering and does not preserve query
401    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
402    /// matches `max_by(field)` selection semantics.
403    pub fn top_k_by(
404        &self,
405        field: impl AsRef<str>,
406        take_count: u32,
407    ) -> Result<Response<E>, QueryError>
408    where
409        E: EntityValue,
410    {
411        self.ensure_non_paged_mode_ready()?;
412
413        self.session
414            .execute_load_query_top_k_by(self.query(), field.as_ref(), take_count)
415    }
416
417    /// Execute and return the bottom `k` rows by `field` under deterministic
418    /// ordering `(field asc, primary_key asc)` over the effective response
419    /// window.
420    ///
421    /// This terminal applies its own ordering and does not preserve query
422    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
423    /// matches `min_by(field)` selection semantics.
424    pub fn bottom_k_by(
425        &self,
426        field: impl AsRef<str>,
427        take_count: u32,
428    ) -> Result<Response<E>, QueryError>
429    where
430        E: EntityValue,
431    {
432        self.ensure_non_paged_mode_ready()?;
433
434        self.session
435            .execute_load_query_bottom_k_by(self.query(), field.as_ref(), take_count)
436    }
437
438    /// Execute and return projected values for the top `k` rows by `field`
439    /// under deterministic ordering `(field desc, primary_key asc)` over the
440    /// effective response window.
441    ///
442    /// Ranking is applied before projection and does not preserve query
443    /// `order_by(...)` row order in the returned values. For `k = 1`, this
444    /// matches `max_by(field)` projected to one value.
445    pub fn top_k_by_values(
446        &self,
447        field: impl AsRef<str>,
448        take_count: u32,
449    ) -> Result<Vec<Value>, QueryError>
450    where
451        E: EntityValue,
452    {
453        self.ensure_non_paged_mode_ready()?;
454
455        self.session
456            .execute_load_query_top_k_by_values(self.query(), field.as_ref(), take_count)
457    }
458
459    /// Execute and return projected values for the bottom `k` rows by `field`
460    /// under deterministic ordering `(field asc, primary_key asc)` over the
461    /// effective response window.
462    ///
463    /// Ranking is applied before projection and does not preserve query
464    /// `order_by(...)` row order in the returned values. For `k = 1`, this
465    /// matches `min_by(field)` projected to one value.
466    pub fn bottom_k_by_values(
467        &self,
468        field: impl AsRef<str>,
469        take_count: u32,
470    ) -> Result<Vec<Value>, QueryError>
471    where
472        E: EntityValue,
473    {
474        self.ensure_non_paged_mode_ready()?;
475
476        self.session
477            .execute_load_query_bottom_k_by_values(self.query(), field.as_ref(), take_count)
478    }
479
480    /// Execute and return projected id/value pairs for the top `k` rows by
481    /// `field` under deterministic ordering `(field desc, primary_key asc)`
482    /// over the effective response window.
483    ///
484    /// Ranking is applied before projection and does not preserve query
485    /// `order_by(...)` row order in the returned values. For `k = 1`, this
486    /// matches `max_by(field)` projected to one `(id, value)` pair.
487    pub fn top_k_by_with_ids(
488        &self,
489        field: impl AsRef<str>,
490        take_count: u32,
491    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
492    where
493        E: EntityValue,
494    {
495        self.ensure_non_paged_mode_ready()?;
496
497        self.session
498            .execute_load_query_top_k_by_with_ids(self.query(), field.as_ref(), take_count)
499    }
500
501    /// Execute and return projected id/value pairs for the bottom `k` rows by
502    /// `field` under deterministic ordering `(field asc, primary_key asc)`
503    /// over the effective response window.
504    ///
505    /// Ranking is applied before projection and does not preserve query
506    /// `order_by(...)` row order in the returned values. For `k = 1`, this
507    /// matches `min_by(field)` projected to one `(id, value)` pair.
508    pub fn bottom_k_by_with_ids(
509        &self,
510        field: impl AsRef<str>,
511        take_count: u32,
512    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
513    where
514        E: EntityValue,
515    {
516        self.ensure_non_paged_mode_ready()?;
517
518        self.session.execute_load_query_bottom_k_by_with_ids(
519            self.query(),
520            field.as_ref(),
521            take_count,
522        )
523    }
524
525    /// Execute and return distinct projected field values for the effective
526    /// result window, preserving first-observed value order.
527    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
528    where
529        E: EntityValue,
530    {
531        self.ensure_non_paged_mode_ready()?;
532
533        self.session
534            .execute_load_query_distinct_values_by(self.query(), field.as_ref())
535    }
536
537    /// Execute and return projected field values paired with row ids for the
538    /// effective result window.
539    pub fn values_by_with_ids(
540        &self,
541        field: impl AsRef<str>,
542    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
543    where
544        E: EntityValue,
545    {
546        self.ensure_non_paged_mode_ready()?;
547
548        self.session
549            .execute_load_query_values_by_with_ids(self.query(), field.as_ref())
550    }
551
552    /// Execute and return the first projected field value in effective response
553    /// order, if any.
554    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
555    where
556        E: EntityValue,
557    {
558        self.ensure_non_paged_mode_ready()?;
559
560        self.session
561            .execute_load_query_first_value_by(self.query(), field.as_ref())
562    }
563
564    /// Execute and return the last projected field value in effective response
565    /// order, if any.
566    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
567    where
568        E: EntityValue,
569    {
570        self.ensure_non_paged_mode_ready()?;
571
572        self.session
573            .execute_load_query_last_value_by(self.query(), field.as_ref())
574    }
575
576    /// Execute and return the first matching identifier in response order, if any.
577    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
578    where
579        E: EntityValue,
580    {
581        self.ensure_non_paged_mode_ready()?;
582
583        self.session.execute_load_query_first(self.query())
584    }
585
586    /// Execute and return the last matching identifier in response order, if any.
587    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
588    where
589        E: EntityValue,
590    {
591        self.ensure_non_paged_mode_ready()?;
592
593        self.session.execute_load_query_last(self.query())
594    }
595
596    /// Execute and require exactly one matching row.
597    pub fn require_one(&self) -> Result<(), QueryError>
598    where
599        E: EntityValue,
600    {
601        self.execute()?.require_one()?;
602        Ok(())
603    }
604
605    /// Execute and require at least one matching row.
606    pub fn require_some(&self) -> Result<(), QueryError>
607    where
608        E: EntityValue,
609    {
610        self.execute()?.require_some()?;
611        Ok(())
612    }
613}
614
615impl<E> FluentLoadQuery<'_, E>
616where
617    E: EntityKind,
618{
619    fn non_paged_intent_error(&self) -> Option<IntentError> {
620        self.cursor_token
621            .as_ref()
622            .map(|_| IntentError::CursorRequiresPagedExecution)
623    }
624
625    fn cursor_intent_error(&self) -> Option<IntentError> {
626        self.cursor_token
627            .as_ref()
628            .and_then(|_| self.paged_intent_error())
629    }
630
631    fn paged_intent_error(&self) -> Option<IntentError> {
632        let spec = self.query.load_spec()?;
633
634        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
635            .err()
636            .map(IntentError::from)
637    }
638
639    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
640        if let Some(err) = self.paged_intent_error() {
641            return Err(QueryError::Intent(err));
642        }
643
644        Ok(())
645    }
646
647    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
648        if let Some(err) = self.non_paged_intent_error() {
649            return Err(QueryError::Intent(err));
650        }
651
652        Ok(())
653    }
654}
655
656impl<E> FluentLoadQuery<'_, E>
657where
658    E: EntityKind + SingletonEntity,
659    E::Key: Default,
660{
661    #[must_use]
662    pub fn only(self) -> Self {
663        self.map_query(Query::only)
664    }
665}
666
667impl<E> PagedLoadQuery<'_, E>
668where
669    E: EntityKind,
670{
671    // ------------------------------------------------------------------
672    // Intent inspection
673    // ------------------------------------------------------------------
674
675    #[must_use]
676    pub const fn query(&self) -> &Query<E> {
677        self.inner.query()
678    }
679
680    // ------------------------------------------------------------------
681    // Cursor continuation
682    // ------------------------------------------------------------------
683
684    /// Attach an opaque continuation token for the next page.
685    #[must_use]
686    pub fn cursor(mut self, token: impl Into<String>) -> Self {
687        self.inner = self.inner.cursor(token);
688        self
689    }
690
691    // ------------------------------------------------------------------
692    // Execution
693    // ------------------------------------------------------------------
694
695    /// Execute in cursor-pagination mode and return items + next cursor.
696    ///
697    /// Continuation is best-effort and forward-only over live state:
698    /// deterministic per request under canonical ordering, with no
699    /// snapshot/version pinned across requests.
700    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
701    where
702        E: EntityValue,
703    {
704        self.execute_with_trace()
705            .map(PagedLoadExecutionWithTrace::into_execution)
706    }
707
708    /// Execute in cursor-pagination mode and return items, next cursor,
709    /// and optional execution trace details when session debug mode is enabled.
710    ///
711    /// Trace collection is opt-in via `DbSession::debug()` and does not
712    /// change query planning or result semantics.
713    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
714    where
715        E: EntityValue,
716    {
717        self.inner.ensure_paged_mode_ready()?;
718
719        self.inner.session.execute_load_query_paged_with_trace(
720            self.inner.query(),
721            self.inner.cursor_token.as_deref(),
722        )
723    }
724}