Skip to main content

icydb_core/db/query/fluent/
load.rs

1use crate::{
2    db::{
3        DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4        query::{
5            expr::{FilterExpr, SortExpr},
6            intent::{IntentError, Query, QueryError},
7            plan::{ExecutablePlan, ExplainPlan},
8            policy,
9            predicate::Predicate,
10        },
11        response::Response,
12    },
13    traits::{EntityKind, EntityValue, SingletonEntity},
14    types::Id,
15};
16
17///
18/// FluentLoadQuery
19///
20/// Session-bound load query wrapper.
21/// Owns intent construction and execution routing only.
22/// All result inspection and projection is performed on `Response<E>`.
23///
24
25pub struct FluentLoadQuery<'a, E>
26where
27    E: EntityKind,
28{
29    session: &'a DbSession<E::Canister>,
30    query: Query<E>,
31    cursor_token: Option<String>,
32}
33
34///
35/// PagedLoadQuery
36///
37/// Session-bound cursor pagination wrapper.
38/// This wrapper only exposes cursor continuation and paged execution.
39///
40
41pub struct PagedLoadQuery<'a, E>
42where
43    E: EntityKind,
44{
45    inner: FluentLoadQuery<'a, E>,
46}
47
48impl<'a, E> FluentLoadQuery<'a, E>
49where
50    E: EntityKind,
51{
52    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
53        Self {
54            session,
55            query,
56            cursor_token: None,
57        }
58    }
59
60    // ------------------------------------------------------------------
61    // Intent inspection
62    // ------------------------------------------------------------------
63
64    #[must_use]
65    pub const fn query(&self) -> &Query<E> {
66        &self.query
67    }
68
69    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
70        self.query = map(self.query);
71        self
72    }
73
74    fn try_map_query(
75        mut self,
76        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
77    ) -> Result<Self, QueryError> {
78        self.query = map(self.query)?;
79        Ok(self)
80    }
81
82    // ------------------------------------------------------------------
83    // Intent builders (pure)
84    // ------------------------------------------------------------------
85
86    /// Set the access path to a single typed primary-key value.
87    ///
88    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
89    #[must_use]
90    pub fn by_id(mut self, id: Id<E>) -> Self {
91        self.query = self.query.by_id(id.key());
92        self
93    }
94
95    /// Set the access path to multiple typed primary-key values.
96    ///
97    /// IDs are public and may come from untrusted input sources.
98    #[must_use]
99    pub fn by_ids<I>(mut self, ids: I) -> Self
100    where
101        I: IntoIterator<Item = Id<E>>,
102    {
103        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
104        self
105    }
106
107    // ------------------------------------------------------------------
108    // Query Refinement
109    // ------------------------------------------------------------------
110
111    #[must_use]
112    pub fn filter(self, predicate: Predicate) -> Self {
113        self.map_query(|query| query.filter(predicate))
114    }
115
116    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
117        self.try_map_query(|query| query.filter_expr(expr))
118    }
119
120    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
121        self.try_map_query(|query| query.sort_expr(expr))
122    }
123
124    #[must_use]
125    pub fn order_by(self, field: impl AsRef<str>) -> Self {
126        self.map_query(|query| query.order_by(field))
127    }
128
129    #[must_use]
130    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
131        self.map_query(|query| query.order_by_desc(field))
132    }
133
134    /// Bound the number of returned rows.
135    ///
136    /// Pagination is only valid with explicit ordering; combine `limit` and/or
137    /// `offset` with `order_by(...)` or planning fails.
138    #[must_use]
139    pub fn limit(self, limit: u32) -> Self {
140        self.map_query(|query| query.limit(limit))
141    }
142
143    /// Skip a number of rows in the ordered result stream.
144    ///
145    /// Pagination is only valid with explicit ordering; combine `offset` and/or
146    /// `limit` with `order_by(...)` or planning fails.
147    #[must_use]
148    pub fn offset(self, offset: u32) -> Self {
149        self.map_query(|query| query.offset(offset))
150    }
151
152    /// Attach an opaque cursor token for continuation pagination.
153    ///
154    /// Cursor-mode invariants are checked before planning/execution:
155    /// - explicit `order_by(...)` is required
156    /// - explicit `limit(...)` is required
157    #[must_use]
158    pub fn cursor(mut self, token: impl Into<String>) -> Self {
159        self.cursor_token = Some(token.into());
160        self
161    }
162
163    // ------------------------------------------------------------------
164    // Planning / diagnostics
165    // ------------------------------------------------------------------
166
167    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
168        self.query.explain()
169    }
170
171    pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
172        if let Some(err) = self.cursor_intent_error() {
173            return Err(QueryError::Intent(err));
174        }
175
176        self.query.plan()
177    }
178
179    // ------------------------------------------------------------------
180    // Execution (single semantic boundary)
181    // ------------------------------------------------------------------
182
183    /// Execute this query using the session's policy settings.
184    pub fn execute(&self) -> Result<Response<E>, QueryError>
185    where
186        E: EntityValue,
187    {
188        self.ensure_non_paged_mode_ready()?;
189
190        self.session.execute_query(self.query())
191    }
192
193    /// Enter typed cursor-pagination mode for this query.
194    ///
195    /// Cursor pagination requires:
196    /// - explicit `order_by(...)`
197    /// - explicit `limit(...)`
198    ///
199    /// Requests are deterministic under canonical ordering, but continuation is
200    /// best-effort and forward-only over live state.
201    /// No snapshot/version is pinned across requests, so concurrent writes may
202    /// shift page boundaries.
203    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
204        self.ensure_paged_mode_ready()?;
205
206        Ok(PagedLoadQuery { inner: self })
207    }
208
209    /// Execute this query as cursor pagination and return items + next cursor.
210    ///
211    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
212    pub fn execute_paged(self) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
213    where
214        E: EntityValue,
215    {
216        self.page()?.execute()
217    }
218
219    // ------------------------------------------------------------------
220    // Execution terminals — semantic only
221    // ------------------------------------------------------------------
222
223    /// Execute and return whether the result set is empty.
224    pub fn is_empty(&self) -> Result<bool, QueryError>
225    where
226        E: EntityValue,
227    {
228        Ok(!self.exists()?)
229    }
230
231    /// Execute and return whether at least one matching row exists.
232    pub fn exists(&self) -> Result<bool, QueryError>
233    where
234        E: EntityValue,
235    {
236        self.ensure_non_paged_mode_ready()?;
237
238        self.session.execute_load_query_exists(self.query())
239    }
240
241    /// Execute and return the number of matching rows.
242    pub fn count(&self) -> Result<u32, QueryError>
243    where
244        E: EntityValue,
245    {
246        self.ensure_non_paged_mode_ready()?;
247
248        self.session.execute_load_query_count(self.query())
249    }
250
251    /// Execute and return the smallest matching identifier, if any.
252    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
253    where
254        E: EntityValue,
255    {
256        self.ensure_non_paged_mode_ready()?;
257
258        self.session.execute_load_query_min(self.query())
259    }
260
261    /// Execute and return the largest matching identifier, if any.
262    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
263    where
264        E: EntityValue,
265    {
266        self.ensure_non_paged_mode_ready()?;
267
268        self.session.execute_load_query_max(self.query())
269    }
270
271    /// Execute and return the first matching identifier in response order, if any.
272    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
273    where
274        E: EntityValue,
275    {
276        self.ensure_non_paged_mode_ready()?;
277
278        self.session.execute_load_query_first(self.query())
279    }
280
281    /// Execute and return the last matching identifier in response order, if any.
282    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
283    where
284        E: EntityValue,
285    {
286        self.ensure_non_paged_mode_ready()?;
287
288        self.session.execute_load_query_last(self.query())
289    }
290
291    /// Execute and require exactly one matching row.
292    pub fn require_one(&self) -> Result<(), QueryError>
293    where
294        E: EntityValue,
295    {
296        self.execute()?.require_one()?;
297        Ok(())
298    }
299
300    /// Execute and require at least one matching row.
301    pub fn require_some(&self) -> Result<(), QueryError>
302    where
303        E: EntityValue,
304    {
305        self.execute()?.require_some()?;
306        Ok(())
307    }
308}
309
310impl<E> FluentLoadQuery<'_, E>
311where
312    E: EntityKind,
313{
314    fn non_paged_intent_error(&self) -> Option<IntentError> {
315        self.cursor_token
316            .as_ref()
317            .map(|_| IntentError::CursorRequiresPagedExecution)
318    }
319
320    fn cursor_intent_error(&self) -> Option<IntentError> {
321        self.cursor_token
322            .as_ref()
323            .and_then(|_| self.paged_intent_error())
324    }
325
326    fn paged_intent_error(&self) -> Option<IntentError> {
327        let spec = self.query.load_spec()?;
328
329        policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
330            .err()
331            .map(IntentError::from)
332    }
333
334    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
335        if let Some(err) = self.paged_intent_error() {
336            return Err(QueryError::Intent(err));
337        }
338
339        Ok(())
340    }
341
342    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
343        if let Some(err) = self.non_paged_intent_error() {
344            return Err(QueryError::Intent(err));
345        }
346
347        Ok(())
348    }
349}
350
351impl<E> FluentLoadQuery<'_, E>
352where
353    E: EntityKind + SingletonEntity,
354    E::Key: Default,
355{
356    #[must_use]
357    pub fn only(self) -> Self {
358        self.map_query(Query::only)
359    }
360}
361
362impl<E> PagedLoadQuery<'_, E>
363where
364    E: EntityKind,
365{
366    // ------------------------------------------------------------------
367    // Intent inspection
368    // ------------------------------------------------------------------
369
370    #[must_use]
371    pub const fn query(&self) -> &Query<E> {
372        self.inner.query()
373    }
374
375    // ------------------------------------------------------------------
376    // Cursor continuation
377    // ------------------------------------------------------------------
378
379    /// Attach an opaque continuation token for the next page.
380    #[must_use]
381    pub fn cursor(mut self, token: impl Into<String>) -> Self {
382        self.inner = self.inner.cursor(token);
383        self
384    }
385
386    // ------------------------------------------------------------------
387    // Execution
388    // ------------------------------------------------------------------
389
390    /// Execute in cursor-pagination mode and return items + next cursor.
391    ///
392    /// Continuation is best-effort and forward-only over live state:
393    /// deterministic per request under canonical ordering, with no
394    /// snapshot/version pinned across requests.
395    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
396    where
397        E: EntityValue,
398    {
399        self.execute_with_trace()
400            .map(|(items, next_cursor, _)| (items, next_cursor))
401    }
402
403    /// Execute in cursor-pagination mode and return items, next cursor,
404    /// and optional execution trace details when session debug mode is enabled.
405    ///
406    /// Trace collection is opt-in via `DbSession::debug()` and does not
407    /// change query planning or result semantics.
408    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
409    where
410        E: EntityValue,
411    {
412        self.inner.ensure_paged_mode_ready()?;
413
414        self.inner.session.execute_load_query_paged_with_trace(
415            self.inner.query(),
416            self.inner.cursor_token.as_deref(),
417        )
418    }
419}