Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        query::{
5            expr::{FilterExpr, SortExpr},
6            intent::{IntentError, Query, QueryError},
7            plan::{ExecutablePlan, ExplainPlan},
8            policy,
9            predicate::Predicate,
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::{Decimal, Id},
15};
16
17type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
18
19///
20/// FluentLoadQuery
21///
22/// Session-bound load query wrapper.
23/// Owns intent construction and execution routing only.
24/// All result inspection and projection is performed on `Response<E>`.
25///
26
27pub struct FluentLoadQuery<'a, E>
28where
29    E: EntityKind,
30{
31    session: &'a DbSession<E::Canister>,
32    query: Query<E>,
33    cursor_token: Option<String>,
34}
35
36///
37/// PagedLoadQuery
38///
39/// Session-bound cursor pagination wrapper.
40/// This wrapper only exposes cursor continuation and paged execution.
41///
42
43pub struct PagedLoadQuery<'a, E>
44where
45    E: EntityKind,
46{
47    inner: FluentLoadQuery<'a, E>,
48}
49
50impl<'a, E> FluentLoadQuery<'a, E>
51where
52    E: EntityKind,
53{
54    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
55        Self {
56            session,
57            query,
58            cursor_token: None,
59        }
60    }
61
62    // ------------------------------------------------------------------
63    // Intent inspection
64    // ------------------------------------------------------------------
65
66    #[must_use]
67    pub const fn query(&self) -> &Query<E> {
68        &self.query
69    }
70
71    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
72        self.query = map(self.query);
73        self
74    }
75
76    fn try_map_query(
77        mut self,
78        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
79    ) -> Result<Self, QueryError> {
80        self.query = map(self.query)?;
81        Ok(self)
82    }
83
84    // ------------------------------------------------------------------
85    // Intent builders (pure)
86    // ------------------------------------------------------------------
87
88    /// Set the access path to a single typed primary-key value.
89    ///
90    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
91    #[must_use]
92    pub fn by_id(mut self, id: Id<E>) -> Self {
93        self.query = self.query.by_id(id.key());
94        self
95    }
96
97    /// Set the access path to multiple typed primary-key values.
98    ///
99    /// IDs are public and may come from untrusted input sources.
100    #[must_use]
101    pub fn by_ids<I>(mut self, ids: I) -> Self
102    where
103        I: IntoIterator<Item = Id<E>>,
104    {
105        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
106        self
107    }
108
109    // ------------------------------------------------------------------
110    // Query Refinement
111    // ------------------------------------------------------------------
112
113    #[must_use]
114    pub fn filter(self, predicate: Predicate) -> Self {
115        self.map_query(|query| query.filter(predicate))
116    }
117
118    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
119        self.try_map_query(|query| query.filter_expr(expr))
120    }
121
122    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
123        self.try_map_query(|query| query.sort_expr(expr))
124    }
125
126    #[must_use]
127    pub fn order_by(self, field: impl AsRef<str>) -> Self {
128        self.map_query(|query| query.order_by(field))
129    }
130
131    #[must_use]
132    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
133        self.map_query(|query| query.order_by_desc(field))
134    }
135
136    /// Bound the number of returned rows.
137    ///
138    /// Pagination is only valid with explicit ordering; combine `limit` and/or
139    /// `offset` with `order_by(...)` or planning fails.
140    #[must_use]
141    pub fn limit(self, limit: u32) -> Self {
142        self.map_query(|query| query.limit(limit))
143    }
144
145    /// Skip a number of rows in the ordered result stream.
146    ///
147    /// Pagination is only valid with explicit ordering; combine `offset` and/or
148    /// `limit` with `order_by(...)` or planning fails.
149    #[must_use]
150    pub fn offset(self, offset: u32) -> Self {
151        self.map_query(|query| query.offset(offset))
152    }
153
154    /// Attach an opaque cursor token for continuation pagination.
155    ///
156    /// Cursor-mode invariants are checked before planning/execution:
157    /// - explicit `order_by(...)` is required
158    /// - explicit `limit(...)` is required
159    #[must_use]
160    pub fn cursor(mut self, token: impl Into<String>) -> Self {
161        self.cursor_token = Some(token.into());
162        self
163    }
164
165    // ------------------------------------------------------------------
166    // Planning / diagnostics
167    // ------------------------------------------------------------------
168
169    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
170        self.query.explain()
171    }
172
173    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
174        if let Some(err) = self.cursor_intent_error() {
175            return Err(QueryError::Intent(err));
176        }
177
178        self.query.plan()
179    }
180
181    // ------------------------------------------------------------------
182    // Execution (single semantic boundary)
183    // ------------------------------------------------------------------
184
185    /// Execute this query using the session's policy settings.
186    pub fn execute(&self) -> Result<Response<E>, QueryError>
187    where
188        E: EntityValue,
189    {
190        self.ensure_non_paged_mode_ready()?;
191
192        self.session.execute_query(self.query())
193    }
194
195    /// Enter typed cursor-pagination mode for this query.
196    ///
197    /// Cursor pagination requires:
198    /// - explicit `order_by(...)`
199    /// - explicit `limit(...)`
200    ///
201    /// Requests are deterministic under canonical ordering, but continuation is
202    /// best-effort and forward-only over live state.
203    /// No snapshot/version is pinned across requests, so concurrent writes may
204    /// shift page boundaries.
205    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
206        self.ensure_paged_mode_ready()?;
207
208        Ok(PagedLoadQuery { inner: self })
209    }
210
211    /// Execute this query as cursor pagination and return items + next cursor.
212    ///
213    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
214    pub fn execute_paged(self) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
215    where
216        E: EntityValue,
217    {
218        self.page()?.execute()
219    }
220
221    // ------------------------------------------------------------------
222    // Execution terminals — semantic only
223    // ------------------------------------------------------------------
224
225    /// Execute and return whether the result set is empty.
226    pub fn is_empty(&self) -> Result<bool, QueryError>
227    where
228        E: EntityValue,
229    {
230        Ok(!self.exists()?)
231    }
232
233    /// Execute and return whether at least one matching row exists.
234    pub fn exists(&self) -> Result<bool, QueryError>
235    where
236        E: EntityValue,
237    {
238        self.ensure_non_paged_mode_ready()?;
239
240        self.session.execute_load_query_exists(self.query())
241    }
242
243    /// Execute and return the number of matching rows.
244    pub fn count(&self) -> Result<u32, QueryError>
245    where
246        E: EntityValue,
247    {
248        self.ensure_non_paged_mode_ready()?;
249
250        self.session.execute_load_query_count(self.query())
251    }
252
253    /// Execute and return the smallest matching identifier, if any.
254    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
255    where
256        E: EntityValue,
257    {
258        self.ensure_non_paged_mode_ready()?;
259
260        self.session.execute_load_query_min(self.query())
261    }
262
263    /// Execute and return the id of the row with the smallest value for `field`.
264    ///
265    /// Ties are deterministic: equal field values resolve by primary key ascending.
266    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
267    where
268        E: EntityValue,
269    {
270        self.ensure_non_paged_mode_ready()?;
271
272        self.session
273            .execute_load_query_min_by(self.query(), field.as_ref())
274    }
275
276    /// Execute and return the largest matching identifier, if any.
277    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
278    where
279        E: EntityValue,
280    {
281        self.ensure_non_paged_mode_ready()?;
282
283        self.session.execute_load_query_max(self.query())
284    }
285
286    /// Execute and return the id of the row with the largest value for `field`.
287    ///
288    /// Ties are deterministic: equal field values resolve by primary key ascending.
289    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
290    where
291        E: EntityValue,
292    {
293        self.ensure_non_paged_mode_ready()?;
294
295        self.session
296            .execute_load_query_max_by(self.query(), field.as_ref())
297    }
298
299    /// Execute and return the id at zero-based ordinal `nth` when rows are
300    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
301    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
302    where
303        E: EntityValue,
304    {
305        self.ensure_non_paged_mode_ready()?;
306
307        self.session
308            .execute_load_query_nth_by(self.query(), field.as_ref(), nth)
309    }
310
311    /// Execute and return the sum of `field` over matching rows.
312    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
313    where
314        E: EntityValue,
315    {
316        self.ensure_non_paged_mode_ready()?;
317
318        self.session
319            .execute_load_query_sum_by(self.query(), field.as_ref())
320    }
321
322    /// Execute and return the average of `field` over matching rows.
323    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
324    where
325        E: EntityValue,
326    {
327        self.ensure_non_paged_mode_ready()?;
328
329        self.session
330            .execute_load_query_avg_by(self.query(), field.as_ref())
331    }
332
333    /// Execute and return the median id by `field` using deterministic ordering
334    /// `(field asc, primary key asc)`.
335    ///
336    /// Even-length windows select the lower median.
337    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
338    where
339        E: EntityValue,
340    {
341        self.ensure_non_paged_mode_ready()?;
342
343        self.session
344            .execute_load_query_median_by(self.query(), field.as_ref())
345    }
346
347    /// Execute and return the number of distinct values for `field` over the
348    /// effective result window.
349    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
350    where
351        E: EntityValue,
352    {
353        self.ensure_non_paged_mode_ready()?;
354
355        self.session
356            .execute_load_query_count_distinct_by(self.query(), field.as_ref())
357    }
358
359    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
360    ///
361    /// Tie handling is deterministic for both extrema: primary key ascending.
362    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
363    where
364        E: EntityValue,
365    {
366        self.ensure_non_paged_mode_ready()?;
367
368        self.session
369            .execute_load_query_min_max_by(self.query(), field.as_ref())
370    }
371
372    /// Execute and return the first matching identifier in response order, if any.
373    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
374    where
375        E: EntityValue,
376    {
377        self.ensure_non_paged_mode_ready()?;
378
379        self.session.execute_load_query_first(self.query())
380    }
381
382    /// Execute and return the last matching identifier in response order, if any.
383    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
384    where
385        E: EntityValue,
386    {
387        self.ensure_non_paged_mode_ready()?;
388
389        self.session.execute_load_query_last(self.query())
390    }
391
392    /// Execute and require exactly one matching row.
393    pub fn require_one(&self) -> Result<(), QueryError>
394    where
395        E: EntityValue,
396    {
397        self.execute()?.require_one()?;
398        Ok(())
399    }
400
401    /// Execute and require at least one matching row.
402    pub fn require_some(&self) -> Result<(), QueryError>
403    where
404        E: EntityValue,
405    {
406        self.execute()?.require_some()?;
407        Ok(())
408    }
409}
410
411impl<E> FluentLoadQuery<'_, E>
412where
413    E: EntityKind,
414{
415    fn non_paged_intent_error(&self) -> Option<IntentError> {
416        self.cursor_token
417            .as_ref()
418            .map(|_| IntentError::CursorRequiresPagedExecution)
419    }
420
421    fn cursor_intent_error(&self) -> Option<IntentError> {
422        self.cursor_token
423            .as_ref()
424            .and_then(|_| self.paged_intent_error())
425    }
426
427    fn paged_intent_error(&self) -> Option<IntentError> {
428        let spec = self.query.load_spec()?;
429
430        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
431            .err()
432            .map(IntentError::from)
433    }
434
435    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
436        if let Some(err) = self.paged_intent_error() {
437            return Err(QueryError::Intent(err));
438        }
439
440        Ok(())
441    }
442
443    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
444        if let Some(err) = self.non_paged_intent_error() {
445            return Err(QueryError::Intent(err));
446        }
447
448        Ok(())
449    }
450}
451
452impl<E> FluentLoadQuery<'_, E>
453where
454    E: EntityKind + SingletonEntity,
455    E::Key: Default,
456{
457    #[must_use]
458    pub fn only(self) -> Self {
459        self.map_query(Query::only)
460    }
461}
462
463impl<E> PagedLoadQuery<'_, E>
464where
465    E: EntityKind,
466{
467    // ------------------------------------------------------------------
468    // Intent inspection
469    // ------------------------------------------------------------------
470
471    #[must_use]
472    pub const fn query(&self) -> &Query<E> {
473        self.inner.query()
474    }
475
476    // ------------------------------------------------------------------
477    // Cursor continuation
478    // ------------------------------------------------------------------
479
480    /// Attach an opaque continuation token for the next page.
481    #[must_use]
482    pub fn cursor(mut self, token: impl Into<String>) -> Self {
483        self.inner = self.inner.cursor(token);
484        self
485    }
486
487    // ------------------------------------------------------------------
488    // Execution
489    // ------------------------------------------------------------------
490
491    /// Execute in cursor-pagination mode and return items + next cursor.
492    ///
493    /// Continuation is best-effort and forward-only over live state:
494    /// deterministic per request under canonical ordering, with no
495    /// snapshot/version pinned across requests.
496    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
497    where
498        E: EntityValue,
499    {
500        self.execute_with_trace()
501            .map(|(items, next_cursor, _)| (items, next_cursor))
502    }
503
504    /// Execute in cursor-pagination mode and return items, next cursor,
505    /// and optional execution trace details when session debug mode is enabled.
506    ///
507    /// Trace collection is opt-in via `DbSession::debug()` and does not
508    /// change query planning or result semantics.
509    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
510    where
511        E: EntityValue,
512    {
513        self.inner.ensure_paged_mode_ready()?;
514
515        self.inner.session.execute_load_query_paged_with_trace(
516            self.inner.query(),
517            self.inner.cursor_token.as_deref(),
518        )
519    }
520}