Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        query::{
5            explain::ExplainPlan,
6            expr::{FilterExpr, SortExpr},
7            intent::{IntentError, PlannedQuery, Query, QueryError},
8            policy,
9            predicate::Predicate,
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// FluentLoadQuery
22///
23/// Session-bound load query wrapper.
24/// Owns intent construction and execution routing only.
25/// All result inspection and projection is performed on `Response<E>`.
26///
27
28pub struct FluentLoadQuery<'a, E>
29where
30    E: EntityKind,
31{
32    session: &'a DbSession<E::Canister>,
33    query: Query<E>,
34    cursor_token: Option<String>,
35}
36
37///
38/// PagedLoadQuery
39///
40/// Session-bound cursor pagination wrapper.
41/// This wrapper only exposes cursor continuation and paged execution.
42///
43
44pub struct PagedLoadQuery<'a, E>
45where
46    E: EntityKind,
47{
48    inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53    E: EntityKind,
54{
55    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56        Self {
57            session,
58            query,
59            cursor_token: None,
60        }
61    }
62
63    // ------------------------------------------------------------------
64    // Intent inspection
65    // ------------------------------------------------------------------
66
67    #[must_use]
68    pub const fn query(&self) -> &Query<E> {
69        &self.query
70    }
71
72    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73        self.query = map(self.query);
74        self
75    }
76
77    fn try_map_query(
78        mut self,
79        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80    ) -> Result<Self, QueryError> {
81        self.query = map(self.query)?;
82        Ok(self)
83    }
84
85    // ------------------------------------------------------------------
86    // Intent builders (pure)
87    // ------------------------------------------------------------------
88
89    /// Set the access path to a single typed primary-key value.
90    ///
91    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
92    #[must_use]
93    pub fn by_id(mut self, id: Id<E>) -> Self {
94        self.query = self.query.by_id(id.key());
95        self
96    }
97
98    /// Set the access path to multiple typed primary-key values.
99    ///
100    /// IDs are public and may come from untrusted input sources.
101    #[must_use]
102    pub fn by_ids<I>(mut self, ids: I) -> Self
103    where
104        I: IntoIterator<Item = Id<E>>,
105    {
106        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107        self
108    }
109
110    // ------------------------------------------------------------------
111    // Query Refinement
112    // ------------------------------------------------------------------
113
114    #[must_use]
115    pub fn filter(self, predicate: Predicate) -> Self {
116        self.map_query(|query| query.filter(predicate))
117    }
118
119    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120        self.try_map_query(|query| query.filter_expr(expr))
121    }
122
123    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124        self.try_map_query(|query| query.sort_expr(expr))
125    }
126
127    #[must_use]
128    pub fn order_by(self, field: impl AsRef<str>) -> Self {
129        self.map_query(|query| query.order_by(field))
130    }
131
132    #[must_use]
133    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134        self.map_query(|query| query.order_by_desc(field))
135    }
136
137    /// Bound the number of returned rows.
138    ///
139    /// Pagination is only valid with explicit ordering; combine `limit` and/or
140    /// `offset` with `order_by(...)` or planning fails.
141    #[must_use]
142    pub fn limit(self, limit: u32) -> Self {
143        self.map_query(|query| query.limit(limit))
144    }
145
146    /// Skip a number of rows in the ordered result stream.
147    ///
148    /// Pagination is only valid with explicit ordering; combine `offset` and/or
149    /// `limit` with `order_by(...)` or planning fails.
150    #[must_use]
151    pub fn offset(self, offset: u32) -> Self {
152        self.map_query(|query| query.offset(offset))
153    }
154
155    /// Attach an opaque cursor token for continuation pagination.
156    ///
157    /// Cursor-mode invariants are checked before planning/execution:
158    /// - explicit `order_by(...)` is required
159    /// - explicit `limit(...)` is required
160    #[must_use]
161    pub fn cursor(mut self, token: impl Into<String>) -> Self {
162        self.cursor_token = Some(token.into());
163        self
164    }
165
166    // ------------------------------------------------------------------
167    // Planning / diagnostics
168    // ------------------------------------------------------------------
169
170    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171        self.query.explain()
172    }
173
174    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
175        if let Some(err) = self.cursor_intent_error() {
176            return Err(QueryError::Intent(err));
177        }
178
179        self.query.planned()
180    }
181
182    // ------------------------------------------------------------------
183    // Execution (single semantic boundary)
184    // ------------------------------------------------------------------
185
186    /// Execute this query using the session's policy settings.
187    pub fn execute(&self) -> Result<Response<E>, QueryError>
188    where
189        E: EntityValue,
190    {
191        self.ensure_non_paged_mode_ready()?;
192
193        self.session.execute_query(self.query())
194    }
195
196    /// Enter typed cursor-pagination mode for this query.
197    ///
198    /// Cursor pagination requires:
199    /// - explicit `order_by(...)`
200    /// - explicit `limit(...)`
201    ///
202    /// Requests are deterministic under canonical ordering, but continuation is
203    /// best-effort and forward-only over live state.
204    /// No snapshot/version is pinned across requests, so concurrent writes may
205    /// shift page boundaries.
206    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
207        self.ensure_paged_mode_ready()?;
208
209        Ok(PagedLoadQuery { inner: self })
210    }
211
212    /// Execute this query as cursor pagination and return items + next cursor.
213    ///
214    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
215    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
216    where
217        E: EntityValue,
218    {
219        self.page()?.execute()
220    }
221
222    // ------------------------------------------------------------------
223    // Execution terminals — semantic only
224    // ------------------------------------------------------------------
225
226    /// Execute and return whether the result set is empty.
227    pub fn is_empty(&self) -> Result<bool, QueryError>
228    where
229        E: EntityValue,
230    {
231        Ok(!self.exists()?)
232    }
233
234    /// Execute and return whether at least one matching row exists.
235    pub fn exists(&self) -> Result<bool, QueryError>
236    where
237        E: EntityValue,
238    {
239        self.ensure_non_paged_mode_ready()?;
240
241        self.session.execute_load_query_exists(self.query())
242    }
243
244    /// Execute and return the number of matching rows.
245    pub fn count(&self) -> Result<u32, QueryError>
246    where
247        E: EntityValue,
248    {
249        self.ensure_non_paged_mode_ready()?;
250
251        self.session.execute_load_query_count(self.query())
252    }
253
254    /// Execute and return the smallest matching identifier, if any.
255    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
256    where
257        E: EntityValue,
258    {
259        self.ensure_non_paged_mode_ready()?;
260
261        self.session.execute_load_query_min(self.query())
262    }
263
264    /// Execute and return the id of the row with the smallest value for `field`.
265    ///
266    /// Ties are deterministic: equal field values resolve by primary key ascending.
267    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
268    where
269        E: EntityValue,
270    {
271        self.ensure_non_paged_mode_ready()?;
272
273        self.session
274            .execute_load_query_min_by(self.query(), field.as_ref())
275    }
276
277    /// Execute and return the largest matching identifier, if any.
278    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
279    where
280        E: EntityValue,
281    {
282        self.ensure_non_paged_mode_ready()?;
283
284        self.session.execute_load_query_max(self.query())
285    }
286
287    /// Execute and return the id of the row with the largest value for `field`.
288    ///
289    /// Ties are deterministic: equal field values resolve by primary key ascending.
290    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
291    where
292        E: EntityValue,
293    {
294        self.ensure_non_paged_mode_ready()?;
295
296        self.session
297            .execute_load_query_max_by(self.query(), field.as_ref())
298    }
299
300    /// Execute and return the id at zero-based ordinal `nth` when rows are
301    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
302    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
303    where
304        E: EntityValue,
305    {
306        self.ensure_non_paged_mode_ready()?;
307
308        self.session
309            .execute_load_query_nth_by(self.query(), field.as_ref(), nth)
310    }
311
312    /// Execute and return the sum of `field` over matching rows.
313    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
314    where
315        E: EntityValue,
316    {
317        self.ensure_non_paged_mode_ready()?;
318
319        self.session
320            .execute_load_query_sum_by(self.query(), field.as_ref())
321    }
322
323    /// Execute and return the average of `field` over matching rows.
324    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
325    where
326        E: EntityValue,
327    {
328        self.ensure_non_paged_mode_ready()?;
329
330        self.session
331            .execute_load_query_avg_by(self.query(), field.as_ref())
332    }
333
334    /// Execute and return the median id by `field` using deterministic ordering
335    /// `(field asc, primary key asc)`.
336    ///
337    /// Even-length windows select the lower median.
338    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.ensure_non_paged_mode_ready()?;
343
344        self.session
345            .execute_load_query_median_by(self.query(), field.as_ref())
346    }
347
348    /// Execute and return the number of distinct values for `field` over the
349    /// effective result window.
350    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
351    where
352        E: EntityValue,
353    {
354        self.ensure_non_paged_mode_ready()?;
355
356        self.session
357            .execute_load_query_count_distinct_by(self.query(), field.as_ref())
358    }
359
360    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
361    ///
362    /// Tie handling is deterministic for both extrema: primary key ascending.
363    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
364    where
365        E: EntityValue,
366    {
367        self.ensure_non_paged_mode_ready()?;
368
369        self.session
370            .execute_load_query_min_max_by(self.query(), field.as_ref())
371    }
372
373    /// Execute and return projected field values for the effective result window.
374    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.ensure_non_paged_mode_ready()?;
379
380        self.session
381            .execute_load_query_values_by(self.query(), field.as_ref())
382    }
383
384    /// Execute and return the first `k` rows from the effective response window.
385    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
386    where
387        E: EntityValue,
388    {
389        self.ensure_non_paged_mode_ready()?;
390
391        self.session
392            .execute_load_query_take(self.query(), take_count)
393    }
394
395    /// Execute and return the top `k` rows by `field` under deterministic
396    /// ordering `(field desc, primary_key asc)` over the effective response
397    /// window.
398    ///
399    /// This terminal applies its own ordering and does not preserve query
400    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
401    /// matches `max_by(field)` selection semantics.
402    pub fn top_k_by(
403        &self,
404        field: impl AsRef<str>,
405        take_count: u32,
406    ) -> Result<Response<E>, QueryError>
407    where
408        E: EntityValue,
409    {
410        self.ensure_non_paged_mode_ready()?;
411
412        self.session
413            .execute_load_query_top_k_by(self.query(), field.as_ref(), take_count)
414    }
415
416    /// Execute and return the bottom `k` rows by `field` under deterministic
417    /// ordering `(field asc, primary_key asc)` over the effective response
418    /// window.
419    ///
420    /// This terminal applies its own ordering and does not preserve query
421    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
422    /// matches `min_by(field)` selection semantics.
423    pub fn bottom_k_by(
424        &self,
425        field: impl AsRef<str>,
426        take_count: u32,
427    ) -> Result<Response<E>, QueryError>
428    where
429        E: EntityValue,
430    {
431        self.ensure_non_paged_mode_ready()?;
432
433        self.session
434            .execute_load_query_bottom_k_by(self.query(), field.as_ref(), take_count)
435    }
436
437    /// Execute and return projected values for the top `k` rows by `field`
438    /// under deterministic ordering `(field desc, primary_key asc)` over the
439    /// effective response window.
440    ///
441    /// Ranking is applied before projection and does not preserve query
442    /// `order_by(...)` row order in the returned values. For `k = 1`, this
443    /// matches `max_by(field)` projected to one value.
444    pub fn top_k_by_values(
445        &self,
446        field: impl AsRef<str>,
447        take_count: u32,
448    ) -> Result<Vec<Value>, QueryError>
449    where
450        E: EntityValue,
451    {
452        self.ensure_non_paged_mode_ready()?;
453
454        self.session
455            .execute_load_query_top_k_by_values(self.query(), field.as_ref(), take_count)
456    }
457
458    /// Execute and return projected values for the bottom `k` rows by `field`
459    /// under deterministic ordering `(field asc, primary_key asc)` over the
460    /// effective response window.
461    ///
462    /// Ranking is applied before projection and does not preserve query
463    /// `order_by(...)` row order in the returned values. For `k = 1`, this
464    /// matches `min_by(field)` projected to one value.
465    pub fn bottom_k_by_values(
466        &self,
467        field: impl AsRef<str>,
468        take_count: u32,
469    ) -> Result<Vec<Value>, QueryError>
470    where
471        E: EntityValue,
472    {
473        self.ensure_non_paged_mode_ready()?;
474
475        self.session
476            .execute_load_query_bottom_k_by_values(self.query(), field.as_ref(), take_count)
477    }
478
479    /// Execute and return projected id/value pairs for the top `k` rows by
480    /// `field` under deterministic ordering `(field desc, primary_key asc)`
481    /// over the effective response window.
482    ///
483    /// Ranking is applied before projection and does not preserve query
484    /// `order_by(...)` row order in the returned values. For `k = 1`, this
485    /// matches `max_by(field)` projected to one `(id, value)` pair.
486    pub fn top_k_by_with_ids(
487        &self,
488        field: impl AsRef<str>,
489        take_count: u32,
490    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
491    where
492        E: EntityValue,
493    {
494        self.ensure_non_paged_mode_ready()?;
495
496        self.session
497            .execute_load_query_top_k_by_with_ids(self.query(), field.as_ref(), take_count)
498    }
499
500    /// Execute and return projected id/value pairs for the bottom `k` rows by
501    /// `field` under deterministic ordering `(field asc, primary_key asc)`
502    /// over the effective response window.
503    ///
504    /// Ranking is applied before projection and does not preserve query
505    /// `order_by(...)` row order in the returned values. For `k = 1`, this
506    /// matches `min_by(field)` projected to one `(id, value)` pair.
507    pub fn bottom_k_by_with_ids(
508        &self,
509        field: impl AsRef<str>,
510        take_count: u32,
511    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
512    where
513        E: EntityValue,
514    {
515        self.ensure_non_paged_mode_ready()?;
516
517        self.session.execute_load_query_bottom_k_by_with_ids(
518            self.query(),
519            field.as_ref(),
520            take_count,
521        )
522    }
523
524    /// Execute and return distinct projected field values for the effective
525    /// result window, preserving first-observed value order.
526    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
527    where
528        E: EntityValue,
529    {
530        self.ensure_non_paged_mode_ready()?;
531
532        self.session
533            .execute_load_query_distinct_values_by(self.query(), field.as_ref())
534    }
535
536    /// Execute and return projected field values paired with row ids for the
537    /// effective result window.
538    pub fn values_by_with_ids(
539        &self,
540        field: impl AsRef<str>,
541    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
542    where
543        E: EntityValue,
544    {
545        self.ensure_non_paged_mode_ready()?;
546
547        self.session
548            .execute_load_query_values_by_with_ids(self.query(), field.as_ref())
549    }
550
551    /// Execute and return the first projected field value in effective response
552    /// order, if any.
553    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
554    where
555        E: EntityValue,
556    {
557        self.ensure_non_paged_mode_ready()?;
558
559        self.session
560            .execute_load_query_first_value_by(self.query(), field.as_ref())
561    }
562
563    /// Execute and return the last projected field value in effective response
564    /// order, if any.
565    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
566    where
567        E: EntityValue,
568    {
569        self.ensure_non_paged_mode_ready()?;
570
571        self.session
572            .execute_load_query_last_value_by(self.query(), field.as_ref())
573    }
574
575    /// Execute and return the first matching identifier in response order, if any.
576    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
577    where
578        E: EntityValue,
579    {
580        self.ensure_non_paged_mode_ready()?;
581
582        self.session.execute_load_query_first(self.query())
583    }
584
585    /// Execute and return the last matching identifier in response order, if any.
586    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
587    where
588        E: EntityValue,
589    {
590        self.ensure_non_paged_mode_ready()?;
591
592        self.session.execute_load_query_last(self.query())
593    }
594
595    /// Execute and require exactly one matching row.
596    pub fn require_one(&self) -> Result<(), QueryError>
597    where
598        E: EntityValue,
599    {
600        self.execute()?.require_one()?;
601        Ok(())
602    }
603
604    /// Execute and require at least one matching row.
605    pub fn require_some(&self) -> Result<(), QueryError>
606    where
607        E: EntityValue,
608    {
609        self.execute()?.require_some()?;
610        Ok(())
611    }
612}
613
614impl<E> FluentLoadQuery<'_, E>
615where
616    E: EntityKind,
617{
618    fn non_paged_intent_error(&self) -> Option<IntentError> {
619        self.cursor_token
620            .as_ref()
621            .map(|_| IntentError::CursorRequiresPagedExecution)
622    }
623
624    fn cursor_intent_error(&self) -> Option<IntentError> {
625        self.cursor_token
626            .as_ref()
627            .and_then(|_| self.paged_intent_error())
628    }
629
630    fn paged_intent_error(&self) -> Option<IntentError> {
631        let spec = self.query.load_spec()?;
632
633        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
634            .err()
635            .map(IntentError::from)
636    }
637
638    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
639        if let Some(err) = self.paged_intent_error() {
640            return Err(QueryError::Intent(err));
641        }
642
643        Ok(())
644    }
645
646    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
647        if let Some(err) = self.non_paged_intent_error() {
648            return Err(QueryError::Intent(err));
649        }
650
651        Ok(())
652    }
653}
654
655impl<E> FluentLoadQuery<'_, E>
656where
657    E: EntityKind + SingletonEntity,
658    E::Key: Default,
659{
660    #[must_use]
661    pub fn only(self) -> Self {
662        self.map_query(Query::only)
663    }
664}
665
666impl<E> PagedLoadQuery<'_, E>
667where
668    E: EntityKind,
669{
670    // ------------------------------------------------------------------
671    // Intent inspection
672    // ------------------------------------------------------------------
673
674    #[must_use]
675    pub const fn query(&self) -> &Query<E> {
676        self.inner.query()
677    }
678
679    // ------------------------------------------------------------------
680    // Cursor continuation
681    // ------------------------------------------------------------------
682
683    /// Attach an opaque continuation token for the next page.
684    #[must_use]
685    pub fn cursor(mut self, token: impl Into<String>) -> Self {
686        self.inner = self.inner.cursor(token);
687        self
688    }
689
690    // ------------------------------------------------------------------
691    // Execution
692    // ------------------------------------------------------------------
693
694    /// Execute in cursor-pagination mode and return items + next cursor.
695    ///
696    /// Continuation is best-effort and forward-only over live state:
697    /// deterministic per request under canonical ordering, with no
698    /// snapshot/version pinned across requests.
699    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
700    where
701        E: EntityValue,
702    {
703        self.execute_with_trace()
704            .map(PagedLoadExecutionWithTrace::into_execution)
705    }
706
707    /// Execute in cursor-pagination mode and return items, next cursor,
708    /// and optional execution trace details when session debug mode is enabled.
709    ///
710    /// Trace collection is opt-in via `DbSession::debug()` and does not
711    /// change query planning or result semantics.
712    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
713    where
714        E: EntityValue,
715    {
716        self.inner.ensure_paged_mode_ready()?;
717
718        self.inner.session.execute_load_query_paged_with_trace(
719            self.inner.query(),
720            self.inner.cursor_token.as_deref(),
721        )
722    }
723}