Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        query::{
5            expr::{FilterExpr, SortExpr},
6            intent::{IntentError, Query, QueryError},
7            plan::{ExecutablePlan, ExplainPlan},
8            policy,
9            predicate::Predicate,
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// FluentLoadQuery
22///
23/// Session-bound load query wrapper.
24/// Owns intent construction and execution routing only.
25/// All result inspection and projection is performed on `Response<E>`.
26///
27
28pub struct FluentLoadQuery<'a, E>
29where
30    E: EntityKind,
31{
32    session: &'a DbSession<E::Canister>,
33    query: Query<E>,
34    cursor_token: Option<String>,
35}
36
37///
38/// PagedLoadQuery
39///
40/// Session-bound cursor pagination wrapper.
41/// This wrapper only exposes cursor continuation and paged execution.
42///
43
44pub struct PagedLoadQuery<'a, E>
45where
46    E: EntityKind,
47{
48    inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53    E: EntityKind,
54{
55    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56        Self {
57            session,
58            query,
59            cursor_token: None,
60        }
61    }
62
63    // ------------------------------------------------------------------
64    // Intent inspection
65    // ------------------------------------------------------------------
66
67    #[must_use]
68    pub const fn query(&self) -> &Query<E> {
69        &self.query
70    }
71
72    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73        self.query = map(self.query);
74        self
75    }
76
77    fn try_map_query(
78        mut self,
79        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80    ) -> Result<Self, QueryError> {
81        self.query = map(self.query)?;
82        Ok(self)
83    }
84
85    // ------------------------------------------------------------------
86    // Intent builders (pure)
87    // ------------------------------------------------------------------
88
89    /// Set the access path to a single typed primary-key value.
90    ///
91    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
92    #[must_use]
93    pub fn by_id(mut self, id: Id<E>) -> Self {
94        self.query = self.query.by_id(id.key());
95        self
96    }
97
98    /// Set the access path to multiple typed primary-key values.
99    ///
100    /// IDs are public and may come from untrusted input sources.
101    #[must_use]
102    pub fn by_ids<I>(mut self, ids: I) -> Self
103    where
104        I: IntoIterator<Item = Id<E>>,
105    {
106        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107        self
108    }
109
110    // ------------------------------------------------------------------
111    // Query Refinement
112    // ------------------------------------------------------------------
113
114    #[must_use]
115    pub fn filter(self, predicate: Predicate) -> Self {
116        self.map_query(|query| query.filter(predicate))
117    }
118
119    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120        self.try_map_query(|query| query.filter_expr(expr))
121    }
122
123    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124        self.try_map_query(|query| query.sort_expr(expr))
125    }
126
127    #[must_use]
128    pub fn order_by(self, field: impl AsRef<str>) -> Self {
129        self.map_query(|query| query.order_by(field))
130    }
131
132    #[must_use]
133    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134        self.map_query(|query| query.order_by_desc(field))
135    }
136
137    /// Bound the number of returned rows.
138    ///
139    /// Pagination is only valid with explicit ordering; combine `limit` and/or
140    /// `offset` with `order_by(...)` or planning fails.
141    #[must_use]
142    pub fn limit(self, limit: u32) -> Self {
143        self.map_query(|query| query.limit(limit))
144    }
145
146    /// Skip a number of rows in the ordered result stream.
147    ///
148    /// Pagination is only valid with explicit ordering; combine `offset` and/or
149    /// `limit` with `order_by(...)` or planning fails.
150    #[must_use]
151    pub fn offset(self, offset: u32) -> Self {
152        self.map_query(|query| query.offset(offset))
153    }
154
155    /// Attach an opaque cursor token for continuation pagination.
156    ///
157    /// Cursor-mode invariants are checked before planning/execution:
158    /// - explicit `order_by(...)` is required
159    /// - explicit `limit(...)` is required
160    #[must_use]
161    pub fn cursor(mut self, token: impl Into<String>) -> Self {
162        self.cursor_token = Some(token.into());
163        self
164    }
165
166    // ------------------------------------------------------------------
167    // Planning / diagnostics
168    // ------------------------------------------------------------------
169
170    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171        self.query.explain()
172    }
173
174    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
175        if let Some(err) = self.cursor_intent_error() {
176            return Err(QueryError::Intent(err));
177        }
178
179        self.query.plan()
180    }
181
182    // ------------------------------------------------------------------
183    // Execution (single semantic boundary)
184    // ------------------------------------------------------------------
185
186    /// Execute this query using the session's policy settings.
187    pub fn execute(&self) -> Result<Response<E>, QueryError>
188    where
189        E: EntityValue,
190    {
191        self.ensure_non_paged_mode_ready()?;
192
193        self.session.execute_query(self.query())
194    }
195
196    /// Enter typed cursor-pagination mode for this query.
197    ///
198    /// Cursor pagination requires:
199    /// - explicit `order_by(...)`
200    /// - explicit `limit(...)`
201    ///
202    /// Requests are deterministic under canonical ordering, but continuation is
203    /// best-effort and forward-only over live state.
204    /// No snapshot/version is pinned across requests, so concurrent writes may
205    /// shift page boundaries.
206    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
207        self.ensure_paged_mode_ready()?;
208
209        Ok(PagedLoadQuery { inner: self })
210    }
211
212    /// Execute this query as cursor pagination and return items + next cursor.
213    ///
214    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
215    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
216    where
217        E: EntityValue,
218    {
219        self.page()?.execute()
220    }
221
222    // ------------------------------------------------------------------
223    // Execution terminals — semantic only
224    // ------------------------------------------------------------------
225
226    /// Execute and return whether the result set is empty.
227    pub fn is_empty(&self) -> Result<bool, QueryError>
228    where
229        E: EntityValue,
230    {
231        Ok(!self.exists()?)
232    }
233
234    /// Execute and return whether at least one matching row exists.
235    pub fn exists(&self) -> Result<bool, QueryError>
236    where
237        E: EntityValue,
238    {
239        self.ensure_non_paged_mode_ready()?;
240
241        self.session.execute_load_query_exists(self.query())
242    }
243
244    /// Execute and return the number of matching rows.
245    pub fn count(&self) -> Result<u32, QueryError>
246    where
247        E: EntityValue,
248    {
249        self.ensure_non_paged_mode_ready()?;
250
251        self.session.execute_load_query_count(self.query())
252    }
253
254    /// Execute and return the smallest matching identifier, if any.
255    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
256    where
257        E: EntityValue,
258    {
259        self.ensure_non_paged_mode_ready()?;
260
261        self.session.execute_load_query_min(self.query())
262    }
263
264    /// Execute and return the id of the row with the smallest value for `field`.
265    ///
266    /// Ties are deterministic: equal field values resolve by primary key ascending.
267    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
268    where
269        E: EntityValue,
270    {
271        self.ensure_non_paged_mode_ready()?;
272
273        self.session
274            .execute_load_query_min_by(self.query(), field.as_ref())
275    }
276
277    /// Execute and return the largest matching identifier, if any.
278    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
279    where
280        E: EntityValue,
281    {
282        self.ensure_non_paged_mode_ready()?;
283
284        self.session.execute_load_query_max(self.query())
285    }
286
287    /// Execute and return the id of the row with the largest value for `field`.
288    ///
289    /// Ties are deterministic: equal field values resolve by primary key ascending.
290    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
291    where
292        E: EntityValue,
293    {
294        self.ensure_non_paged_mode_ready()?;
295
296        self.session
297            .execute_load_query_max_by(self.query(), field.as_ref())
298    }
299
300    /// Execute and return the id at zero-based ordinal `nth` when rows are
301    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
302    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
303    where
304        E: EntityValue,
305    {
306        self.ensure_non_paged_mode_ready()?;
307
308        self.session
309            .execute_load_query_nth_by(self.query(), field.as_ref(), nth)
310    }
311
312    /// Execute and return the sum of `field` over matching rows.
313    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
314    where
315        E: EntityValue,
316    {
317        self.ensure_non_paged_mode_ready()?;
318
319        self.session
320            .execute_load_query_sum_by(self.query(), field.as_ref())
321    }
322
323    /// Execute and return the average of `field` over matching rows.
324    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
325    where
326        E: EntityValue,
327    {
328        self.ensure_non_paged_mode_ready()?;
329
330        self.session
331            .execute_load_query_avg_by(self.query(), field.as_ref())
332    }
333
334    /// Execute and return the median id by `field` using deterministic ordering
335    /// `(field asc, primary key asc)`.
336    ///
337    /// Even-length windows select the lower median.
338    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.ensure_non_paged_mode_ready()?;
343
344        self.session
345            .execute_load_query_median_by(self.query(), field.as_ref())
346    }
347
348    /// Execute and return the number of distinct values for `field` over the
349    /// effective result window.
350    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
351    where
352        E: EntityValue,
353    {
354        self.ensure_non_paged_mode_ready()?;
355
356        self.session
357            .execute_load_query_count_distinct_by(self.query(), field.as_ref())
358    }
359
360    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
361    ///
362    /// Tie handling is deterministic for both extrema: primary key ascending.
363    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
364    where
365        E: EntityValue,
366    {
367        self.ensure_non_paged_mode_ready()?;
368
369        self.session
370            .execute_load_query_min_max_by(self.query(), field.as_ref())
371    }
372
373    /// Execute and return projected field values for the effective result window.
374    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
375    where
376        E: EntityValue,
377    {
378        self.ensure_non_paged_mode_ready()?;
379
380        self.session
381            .execute_load_query_values_by(self.query(), field.as_ref())
382    }
383
384    /// Execute and return the first matching identifier in response order, if any.
385    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
386    where
387        E: EntityValue,
388    {
389        self.ensure_non_paged_mode_ready()?;
390
391        self.session.execute_load_query_first(self.query())
392    }
393
394    /// Execute and return the last matching identifier in response order, if any.
395    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
396    where
397        E: EntityValue,
398    {
399        self.ensure_non_paged_mode_ready()?;
400
401        self.session.execute_load_query_last(self.query())
402    }
403
404    /// Execute and require exactly one matching row.
405    pub fn require_one(&self) -> Result<(), QueryError>
406    where
407        E: EntityValue,
408    {
409        self.execute()?.require_one()?;
410        Ok(())
411    }
412
413    /// Execute and require at least one matching row.
414    pub fn require_some(&self) -> Result<(), QueryError>
415    where
416        E: EntityValue,
417    {
418        self.execute()?.require_some()?;
419        Ok(())
420    }
421}
422
423impl<E> FluentLoadQuery<'_, E>
424where
425    E: EntityKind,
426{
427    fn non_paged_intent_error(&self) -> Option<IntentError> {
428        self.cursor_token
429            .as_ref()
430            .map(|_| IntentError::CursorRequiresPagedExecution)
431    }
432
433    fn cursor_intent_error(&self) -> Option<IntentError> {
434        self.cursor_token
435            .as_ref()
436            .and_then(|_| self.paged_intent_error())
437    }
438
439    fn paged_intent_error(&self) -> Option<IntentError> {
440        let spec = self.query.load_spec()?;
441
442        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
443            .err()
444            .map(IntentError::from)
445    }
446
447    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
448        if let Some(err) = self.paged_intent_error() {
449            return Err(QueryError::Intent(err));
450        }
451
452        Ok(())
453    }
454
455    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
456        if let Some(err) = self.non_paged_intent_error() {
457            return Err(QueryError::Intent(err));
458        }
459
460        Ok(())
461    }
462}
463
464impl<E> FluentLoadQuery<'_, E>
465where
466    E: EntityKind + SingletonEntity,
467    E::Key: Default,
468{
469    #[must_use]
470    pub fn only(self) -> Self {
471        self.map_query(Query::only)
472    }
473}
474
475impl<E> PagedLoadQuery<'_, E>
476where
477    E: EntityKind,
478{
479    // ------------------------------------------------------------------
480    // Intent inspection
481    // ------------------------------------------------------------------
482
483    #[must_use]
484    pub const fn query(&self) -> &Query<E> {
485        self.inner.query()
486    }
487
488    // ------------------------------------------------------------------
489    // Cursor continuation
490    // ------------------------------------------------------------------
491
492    /// Attach an opaque continuation token for the next page.
493    #[must_use]
494    pub fn cursor(mut self, token: impl Into<String>) -> Self {
495        self.inner = self.inner.cursor(token);
496        self
497    }
498
499    // ------------------------------------------------------------------
500    // Execution
501    // ------------------------------------------------------------------
502
503    /// Execute in cursor-pagination mode and return items + next cursor.
504    ///
505    /// Continuation is best-effort and forward-only over live state:
506    /// deterministic per request under canonical ordering, with no
507    /// snapshot/version pinned across requests.
508    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
509    where
510        E: EntityValue,
511    {
512        self.execute_with_trace()
513            .map(PagedLoadExecutionWithTrace::into_execution)
514    }
515
516    /// Execute in cursor-pagination mode and return items, next cursor,
517    /// and optional execution trace details when session debug mode is enabled.
518    ///
519    /// Trace collection is opt-in via `DbSession::debug()` and does not
520    /// change query planning or result semantics.
521    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
522    where
523        E: EntityValue,
524    {
525        self.inner.ensure_paged_mode_ready()?;
526
527        self.inner.session.execute_load_query_paged_with_trace(
528            self.inner.query(),
529            self.inner.cursor_token.as_deref(),
530        )
531    }
532}