Skip to main content

icydb_core/db/query/fluent/
load.rs

1//! Module: query::fluent::load
2//! Responsibility: fluent load-query builder, pagination, and execution routing.
3//! Does not own: planner semantics or row-level predicate evaluation.
4//! Boundary: session API facade over query intent/planning/execution.
5
6use crate::{
7    db::{
8        DbSession, PagedGroupedExecutionWithTrace, PagedLoadExecution, PagedLoadExecutionWithTrace,
9        predicate::{CompareOp, Predicate},
10        query::{
11            builder::aggregate::AggregateExpr,
12            explain::ExplainPlan,
13            expr::{FilterExpr, SortExpr},
14            intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
15            plan::{FieldSlot, validate_fluent_non_paged_mode, validate_fluent_paged_mode},
16        },
17        response::Response,
18    },
19    error::InternalError,
20    traits::{EntityKind, EntityValue, SingletonEntity},
21    types::{Decimal, Id},
22    value::Value,
23};
24
25type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
26
27///
28/// FluentLoadQuery
29///
30/// Session-bound load query wrapper.
31/// Owns intent construction and execution routing only.
32/// All result inspection and projection is performed on `Response<E>`.
33///
34
35pub struct FluentLoadQuery<'a, E>
36where
37    E: EntityKind,
38{
39    session: &'a DbSession<E::Canister>,
40    query: Query<E>,
41    cursor_token: Option<String>,
42}
43
44impl<'a, E> FluentLoadQuery<'a, E>
45where
46    E: EntityKind,
47{
48    pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
49        Self {
50            session,
51            query,
52            cursor_token: None,
53        }
54    }
55
56    // ------------------------------------------------------------------
57    // Intent inspection
58    // ------------------------------------------------------------------
59
60    #[must_use]
61    pub const fn query(&self) -> &Query<E> {
62        &self.query
63    }
64
65    fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
66        self.query = map(self.query);
67        self
68    }
69
70    fn try_map_query(
71        mut self,
72        map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
73    ) -> Result<Self, QueryError> {
74        self.query = map(self.query)?;
75        Ok(self)
76    }
77
78    // ------------------------------------------------------------------
79    // Intent builders (pure)
80    // ------------------------------------------------------------------
81
82    /// Set the access path to a single typed primary-key value.
83    ///
84    /// `Id<E>` is treated as a plain query input value here. It does not grant access.
85    #[must_use]
86    pub fn by_id(mut self, id: Id<E>) -> Self {
87        self.query = self.query.by_id(id.key());
88        self
89    }
90
91    /// Set the access path to multiple typed primary-key values.
92    ///
93    /// IDs are public and may come from untrusted input sources.
94    #[must_use]
95    pub fn by_ids<I>(mut self, ids: I) -> Self
96    where
97        I: IntoIterator<Item = Id<E>>,
98    {
99        self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
100        self
101    }
102
103    // ------------------------------------------------------------------
104    // Query Refinement
105    // ------------------------------------------------------------------
106
107    #[must_use]
108    pub fn filter(self, predicate: Predicate) -> Self {
109        self.map_query(|query| query.filter(predicate))
110    }
111
112    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
113        self.try_map_query(|query| query.filter_expr(expr))
114    }
115
116    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
117        self.try_map_query(|query| query.sort_expr(expr))
118    }
119
120    #[must_use]
121    pub fn order_by(self, field: impl AsRef<str>) -> Self {
122        self.map_query(|query| query.order_by(field))
123    }
124
125    #[must_use]
126    pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
127        self.map_query(|query| query.order_by_desc(field))
128    }
129
130    /// Add one grouped key field.
131    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
132        let field = field.as_ref().to_owned();
133        self.try_map_query(|query| query.group_by(&field))
134    }
135
136    /// Add one aggregate terminal via composable aggregate expression.
137    #[must_use]
138    pub fn aggregate(self, aggregate: AggregateExpr) -> Self {
139        self.map_query(|query| query.aggregate(aggregate))
140    }
141
142    /// Override grouped hard limits for grouped execution budget enforcement.
143    #[must_use]
144    pub fn grouped_limits(self, max_groups: u64, max_group_bytes: u64) -> Self {
145        self.map_query(|query| query.grouped_limits(max_groups, max_group_bytes))
146    }
147
148    /// Add one grouped HAVING compare clause over one grouped key field.
149    pub fn having_group(
150        self,
151        field: impl AsRef<str>,
152        op: CompareOp,
153        value: Value,
154    ) -> Result<Self, QueryError> {
155        let field = field.as_ref().to_owned();
156        self.try_map_query(|query| query.having_group(&field, op, value))
157    }
158
159    /// Add one grouped HAVING compare clause over one grouped aggregate output.
160    pub fn having_aggregate(
161        self,
162        aggregate_index: usize,
163        op: CompareOp,
164        value: Value,
165    ) -> Result<Self, QueryError> {
166        self.try_map_query(|query| query.having_aggregate(aggregate_index, op, value))
167    }
168
169    /// Bound the number of returned rows.
170    ///
171    /// Scalar pagination requires explicit ordering; combine `limit` and/or
172    /// `offset` with `order_by(...)` or planning fails for scalar loads.
173    /// GROUP BY pagination uses canonical grouped-key order by default.
174    #[must_use]
175    pub fn limit(self, limit: u32) -> Self {
176        self.map_query(|query| query.limit(limit))
177    }
178
179    /// Skip a number of rows in the ordered result stream.
180    ///
181    /// Scalar pagination requires explicit ordering; combine `offset` and/or
182    /// `limit` with `order_by(...)` or planning fails for scalar loads.
183    /// GROUP BY pagination uses canonical grouped-key order by default.
184    #[must_use]
185    pub fn offset(self, offset: u32) -> Self {
186        self.map_query(|query| query.offset(offset))
187    }
188
189    /// Attach an opaque cursor token for continuation pagination.
190    ///
191    /// Cursor-mode invariants are checked before planning/execution:
192    /// - explicit `order_by(...)` is required
193    /// - explicit `limit(...)` is required
194    #[must_use]
195    pub fn cursor(mut self, token: impl Into<String>) -> Self {
196        self.cursor_token = Some(token.into());
197        self
198    }
199
200    // ------------------------------------------------------------------
201    // Planning / diagnostics
202    // ------------------------------------------------------------------
203
204    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
205        self.query.explain()
206    }
207
208    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
209        if let Some(err) = self.cursor_intent_error() {
210            return Err(QueryError::Intent(err));
211        }
212
213        self.query.planned()
214    }
215
216    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
217        if let Some(err) = self.cursor_intent_error() {
218            return Err(QueryError::Intent(err));
219        }
220
221        self.query.plan()
222    }
223
224    // ------------------------------------------------------------------
225    // Execution (single semantic boundary)
226    // ------------------------------------------------------------------
227
228    /// Execute this query using the session's policy settings.
229    pub fn execute(&self) -> Result<Response<E>, QueryError>
230    where
231        E: EntityValue,
232    {
233        self.ensure_non_paged_mode_ready()?;
234
235        self.session.execute_query(self.query())
236    }
237
238    /// Enter typed cursor-pagination mode for this query.
239    ///
240    /// Cursor pagination requires:
241    /// - explicit `order_by(...)`
242    /// - explicit `limit(...)`
243    ///
244    /// Requests are deterministic under canonical ordering, but continuation is
245    /// best-effort and forward-only over live state.
246    /// No snapshot/version is pinned across requests, so concurrent writes may
247    /// shift page boundaries.
248    pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
249        self.ensure_paged_mode_ready()?;
250
251        Ok(PagedLoadQuery { inner: self })
252    }
253
254    /// Execute this query as cursor pagination and return items + next cursor.
255    ///
256    /// The returned cursor token is opaque and must be passed back via `.cursor(...)`.
257    pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
258    where
259        E: EntityValue,
260    {
261        self.page()?.execute()
262    }
263
264    /// Execute one grouped query page with optional grouped continuation cursor.
265    ///
266    /// This grouped entrypoint is intentionally separate from scalar load
267    /// execution to keep grouped response shape explicit.
268    pub fn execute_grouped(self) -> Result<PagedGroupedExecutionWithTrace, QueryError>
269    where
270        E: EntityValue,
271    {
272        self.session
273            .execute_grouped(self.query(), self.cursor_token.as_deref())
274    }
275
276    // ------------------------------------------------------------------
277    // Execution terminals — semantic only
278    // ------------------------------------------------------------------
279
280    /// Execute and return whether the result set is empty.
281    pub fn is_empty(&self) -> Result<bool, QueryError>
282    where
283        E: EntityValue,
284    {
285        Ok(!self.exists()?)
286    }
287
288    /// Execute and return whether at least one matching row exists.
289    pub fn exists(&self) -> Result<bool, QueryError>
290    where
291        E: EntityValue,
292    {
293        self.ensure_non_paged_mode_ready()?;
294
295        self.session
296            .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
297    }
298
299    /// Execute and return the number of matching rows.
300    pub fn count(&self) -> Result<u32, QueryError>
301    where
302        E: EntityValue,
303    {
304        self.ensure_non_paged_mode_ready()?;
305
306        self.session
307            .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
308    }
309
310    /// Execute and return the smallest matching identifier, if any.
311    pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
312    where
313        E: EntityValue,
314    {
315        self.ensure_non_paged_mode_ready()?;
316
317        self.session
318            .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
319    }
320
321    /// Execute and return the id of the row with the smallest value for `field`.
322    ///
323    /// Ties are deterministic: equal field values resolve by primary key ascending.
324    pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
325    where
326        E: EntityValue,
327    {
328        self.ensure_non_paged_mode_ready()?;
329        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
330
331        self.session
332            .execute_load_query_with(self.query(), move |load, plan| {
333                load.aggregate_min_by_slot(plan, target_slot)
334            })
335    }
336
337    /// Execute and return the largest matching identifier, if any.
338    pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
339    where
340        E: EntityValue,
341    {
342        self.ensure_non_paged_mode_ready()?;
343
344        self.session
345            .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
346    }
347
348    /// Execute and return the id of the row with the largest value for `field`.
349    ///
350    /// Ties are deterministic: equal field values resolve by primary key ascending.
351    pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
352    where
353        E: EntityValue,
354    {
355        self.ensure_non_paged_mode_ready()?;
356        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
357
358        self.session
359            .execute_load_query_with(self.query(), move |load, plan| {
360                load.aggregate_max_by_slot(plan, target_slot)
361            })
362    }
363
364    /// Execute and return the id at zero-based ordinal `nth` when rows are
365    /// ordered by `field` ascending, with primary-key ascending tie-breaks.
366    pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
367    where
368        E: EntityValue,
369    {
370        self.ensure_non_paged_mode_ready()?;
371        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
372
373        self.session
374            .execute_load_query_with(self.query(), move |load, plan| {
375                load.aggregate_nth_by_slot(plan, target_slot, nth)
376            })
377    }
378
379    /// Execute and return the sum of `field` over matching rows.
380    pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
381    where
382        E: EntityValue,
383    {
384        self.ensure_non_paged_mode_ready()?;
385        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
386
387        self.session
388            .execute_load_query_with(self.query(), move |load, plan| {
389                load.aggregate_sum_by_slot(plan, target_slot)
390            })
391    }
392
393    /// Execute and return the sum of distinct `field` values.
394    pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
395    where
396        E: EntityValue,
397    {
398        self.ensure_non_paged_mode_ready()?;
399        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
400
401        self.session
402            .execute_load_query_with(self.query(), move |load, plan| {
403                load.aggregate_sum_distinct_by_slot(plan, target_slot)
404            })
405    }
406
407    /// Execute and return the average of `field` over matching rows.
408    pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
409    where
410        E: EntityValue,
411    {
412        self.ensure_non_paged_mode_ready()?;
413        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
414
415        self.session
416            .execute_load_query_with(self.query(), move |load, plan| {
417                load.aggregate_avg_by_slot(plan, target_slot)
418            })
419    }
420
421    /// Execute and return the median id by `field` using deterministic ordering
422    /// `(field asc, primary key asc)`.
423    ///
424    /// Even-length windows select the lower median.
425    pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
426    where
427        E: EntityValue,
428    {
429        self.ensure_non_paged_mode_ready()?;
430        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
431
432        self.session
433            .execute_load_query_with(self.query(), move |load, plan| {
434                load.aggregate_median_by_slot(plan, target_slot)
435            })
436    }
437
438    /// Execute and return the number of distinct values for `field` over the
439    /// effective result window.
440    pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
441    where
442        E: EntityValue,
443    {
444        self.ensure_non_paged_mode_ready()?;
445        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
446
447        self.session
448            .execute_load_query_with(self.query(), move |load, plan| {
449                load.aggregate_count_distinct_by_slot(plan, target_slot)
450            })
451    }
452
453    /// Execute and return both `(min_by(field), max_by(field))` in one terminal.
454    ///
455    /// Tie handling is deterministic for both extrema: primary key ascending.
456    pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
457    where
458        E: EntityValue,
459    {
460        self.ensure_non_paged_mode_ready()?;
461        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
462
463        self.session
464            .execute_load_query_with(self.query(), move |load, plan| {
465                load.aggregate_min_max_by_slot(plan, target_slot)
466            })
467    }
468
469    /// Execute and return projected field values for the effective result window.
470    pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
471    where
472        E: EntityValue,
473    {
474        self.ensure_non_paged_mode_ready()?;
475        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
476
477        self.session
478            .execute_load_query_with(self.query(), move |load, plan| {
479                load.values_by_slot(plan, target_slot)
480            })
481    }
482
483    /// Execute and return the first `k` rows from the effective response window.
484    pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
485    where
486        E: EntityValue,
487    {
488        self.ensure_non_paged_mode_ready()?;
489
490        self.session
491            .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
492    }
493
494    /// Execute and return the top `k` rows by `field` under deterministic
495    /// ordering `(field desc, primary_key asc)` over the effective response
496    /// window.
497    ///
498    /// This terminal applies its own ordering and does not preserve query
499    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
500    /// matches `max_by(field)` selection semantics.
501    pub fn top_k_by(
502        &self,
503        field: impl AsRef<str>,
504        take_count: u32,
505    ) -> Result<Response<E>, QueryError>
506    where
507        E: EntityValue,
508    {
509        self.ensure_non_paged_mode_ready()?;
510        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
511
512        self.session
513            .execute_load_query_with(self.query(), move |load, plan| {
514                load.top_k_by_slot(plan, target_slot, take_count)
515            })
516    }
517
518    /// Execute and return the bottom `k` rows by `field` under deterministic
519    /// ordering `(field asc, primary_key asc)` over the effective response
520    /// window.
521    ///
522    /// This terminal applies its own ordering and does not preserve query
523    /// `order_by(...)` row order in the returned rows. For `k = 1`, this
524    /// matches `min_by(field)` selection semantics.
525    pub fn bottom_k_by(
526        &self,
527        field: impl AsRef<str>,
528        take_count: u32,
529    ) -> Result<Response<E>, QueryError>
530    where
531        E: EntityValue,
532    {
533        self.ensure_non_paged_mode_ready()?;
534        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
535
536        self.session
537            .execute_load_query_with(self.query(), move |load, plan| {
538                load.bottom_k_by_slot(plan, target_slot, take_count)
539            })
540    }
541
542    /// Execute and return projected values for the top `k` rows by `field`
543    /// under deterministic ordering `(field desc, primary_key asc)` over the
544    /// effective response window.
545    ///
546    /// Ranking is applied before projection and does not preserve query
547    /// `order_by(...)` row order in the returned values. For `k = 1`, this
548    /// matches `max_by(field)` projected to one value.
549    pub fn top_k_by_values(
550        &self,
551        field: impl AsRef<str>,
552        take_count: u32,
553    ) -> Result<Vec<Value>, QueryError>
554    where
555        E: EntityValue,
556    {
557        self.ensure_non_paged_mode_ready()?;
558        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
559
560        self.session
561            .execute_load_query_with(self.query(), move |load, plan| {
562                load.top_k_by_values_slot(plan, target_slot, take_count)
563            })
564    }
565
566    /// Execute and return projected values for the bottom `k` rows by `field`
567    /// under deterministic ordering `(field asc, primary_key asc)` over the
568    /// effective response window.
569    ///
570    /// Ranking is applied before projection and does not preserve query
571    /// `order_by(...)` row order in the returned values. For `k = 1`, this
572    /// matches `min_by(field)` projected to one value.
573    pub fn bottom_k_by_values(
574        &self,
575        field: impl AsRef<str>,
576        take_count: u32,
577    ) -> Result<Vec<Value>, QueryError>
578    where
579        E: EntityValue,
580    {
581        self.ensure_non_paged_mode_ready()?;
582        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
583
584        self.session
585            .execute_load_query_with(self.query(), move |load, plan| {
586                load.bottom_k_by_values_slot(plan, target_slot, take_count)
587            })
588    }
589
590    /// Execute and return projected id/value pairs for the top `k` rows by
591    /// `field` under deterministic ordering `(field desc, primary_key asc)`
592    /// over the effective response window.
593    ///
594    /// Ranking is applied before projection and does not preserve query
595    /// `order_by(...)` row order in the returned values. For `k = 1`, this
596    /// matches `max_by(field)` projected to one `(id, value)` pair.
597    pub fn top_k_by_with_ids(
598        &self,
599        field: impl AsRef<str>,
600        take_count: u32,
601    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
602    where
603        E: EntityValue,
604    {
605        self.ensure_non_paged_mode_ready()?;
606        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
607
608        self.session
609            .execute_load_query_with(self.query(), move |load, plan| {
610                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
611            })
612    }
613
614    /// Execute and return projected id/value pairs for the bottom `k` rows by
615    /// `field` under deterministic ordering `(field asc, primary_key asc)`
616    /// over the effective response window.
617    ///
618    /// Ranking is applied before projection and does not preserve query
619    /// `order_by(...)` row order in the returned values. For `k = 1`, this
620    /// matches `min_by(field)` projected to one `(id, value)` pair.
621    pub fn bottom_k_by_with_ids(
622        &self,
623        field: impl AsRef<str>,
624        take_count: u32,
625    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
626    where
627        E: EntityValue,
628    {
629        self.ensure_non_paged_mode_ready()?;
630        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
631
632        self.session
633            .execute_load_query_with(self.query(), move |load, plan| {
634                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
635            })
636    }
637
638    /// Execute and return distinct projected field values for the effective
639    /// result window, preserving first-observed value order.
640    pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
641    where
642        E: EntityValue,
643    {
644        self.ensure_non_paged_mode_ready()?;
645        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
646
647        self.session
648            .execute_load_query_with(self.query(), move |load, plan| {
649                load.distinct_values_by_slot(plan, target_slot)
650            })
651    }
652
653    /// Execute and return projected field values paired with row ids for the
654    /// effective result window.
655    pub fn values_by_with_ids(
656        &self,
657        field: impl AsRef<str>,
658    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
659    where
660        E: EntityValue,
661    {
662        self.ensure_non_paged_mode_ready()?;
663        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
664
665        self.session
666            .execute_load_query_with(self.query(), move |load, plan| {
667                load.values_by_with_ids_slot(plan, target_slot)
668            })
669    }
670
671    /// Execute and return the first projected field value in effective response
672    /// order, if any.
673    pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
674    where
675        E: EntityValue,
676    {
677        self.ensure_non_paged_mode_ready()?;
678        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
679
680        self.session
681            .execute_load_query_with(self.query(), move |load, plan| {
682                load.first_value_by_slot(plan, target_slot)
683            })
684    }
685
686    /// Execute and return the last projected field value in effective response
687    /// order, if any.
688    pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
689    where
690        E: EntityValue,
691    {
692        self.ensure_non_paged_mode_ready()?;
693        let target_slot = Self::resolve_terminal_field_slot(field.as_ref())?;
694
695        self.session
696            .execute_load_query_with(self.query(), move |load, plan| {
697                load.last_value_by_slot(plan, target_slot)
698            })
699    }
700
701    /// Execute and return the first matching identifier in response order, if any.
702    pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
703    where
704        E: EntityValue,
705    {
706        self.ensure_non_paged_mode_ready()?;
707
708        self.session
709            .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
710    }
711
712    /// Execute and return the last matching identifier in response order, if any.
713    pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
714    where
715        E: EntityValue,
716    {
717        self.ensure_non_paged_mode_ready()?;
718
719        self.session
720            .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
721    }
722
723    /// Execute and require exactly one matching row.
724    pub fn require_one(&self) -> Result<(), QueryError>
725    where
726        E: EntityValue,
727    {
728        self.execute()?.require_one()?;
729        Ok(())
730    }
731
732    /// Execute and require at least one matching row.
733    pub fn require_some(&self) -> Result<(), QueryError>
734    where
735        E: EntityValue,
736    {
737        self.execute()?.require_some()?;
738        Ok(())
739    }
740}
741
742impl<E> FluentLoadQuery<'_, E>
743where
744    E: EntityKind,
745{
746    // Resolve one terminal field target through the planner field-slot boundary.
747    // Unknown fields are rejected here so fluent terminal routing cannot bypass
748    // planner slot resolution and drift back to runtime string lookups.
749    fn resolve_terminal_field_slot(field: &str) -> Result<FieldSlot, QueryError> {
750        FieldSlot::resolve(E::MODEL, field).ok_or_else(|| {
751            QueryError::execute(InternalError::executor_unsupported(format!(
752                "unknown aggregate target field: {field}",
753            )))
754        })
755    }
756
757    fn non_paged_intent_error(&self) -> Option<IntentError> {
758        validate_fluent_non_paged_mode(self.cursor_token.is_some(), self.query.has_grouping())
759            .err()
760            .map(IntentError::from)
761    }
762
763    fn cursor_intent_error(&self) -> Option<IntentError> {
764        self.cursor_token
765            .as_ref()
766            .and_then(|_| self.paged_intent_error())
767    }
768
769    fn paged_intent_error(&self) -> Option<IntentError> {
770        validate_fluent_paged_mode(
771            self.query.has_grouping(),
772            self.query.has_explicit_order(),
773            self.query.load_spec(),
774        )
775        .err()
776        .map(IntentError::from)
777    }
778
779    fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
780        if let Some(err) = self.paged_intent_error() {
781            return Err(QueryError::Intent(err));
782        }
783
784        Ok(())
785    }
786
787    fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
788        if let Some(err) = self.non_paged_intent_error() {
789            return Err(QueryError::Intent(err));
790        }
791
792        Ok(())
793    }
794}
795
796impl<E> FluentLoadQuery<'_, E>
797where
798    E: EntityKind + SingletonEntity,
799    E::Key: Default,
800{
801    #[must_use]
802    pub fn only(self) -> Self {
803        self.map_query(Query::only)
804    }
805}
806
807///
808/// PagedLoadQuery
809///
810/// Session-bound cursor pagination wrapper.
811/// This wrapper only exposes cursor continuation and paged execution.
812///
813
814pub struct PagedLoadQuery<'a, E>
815where
816    E: EntityKind,
817{
818    inner: FluentLoadQuery<'a, E>,
819}
820
821impl<E> PagedLoadQuery<'_, E>
822where
823    E: EntityKind,
824{
825    // ------------------------------------------------------------------
826    // Intent inspection
827    // ------------------------------------------------------------------
828
829    #[must_use]
830    pub const fn query(&self) -> &Query<E> {
831        self.inner.query()
832    }
833
834    // ------------------------------------------------------------------
835    // Cursor continuation
836    // ------------------------------------------------------------------
837
838    /// Attach an opaque continuation token for the next page.
839    #[must_use]
840    pub fn cursor(mut self, token: impl Into<String>) -> Self {
841        self.inner = self.inner.cursor(token);
842        self
843    }
844
845    // ------------------------------------------------------------------
846    // Execution
847    // ------------------------------------------------------------------
848
849    /// Execute in cursor-pagination mode and return items + next cursor.
850    ///
851    /// Continuation is best-effort and forward-only over live state:
852    /// deterministic per request under canonical ordering, with no
853    /// snapshot/version pinned across requests.
854    pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
855    where
856        E: EntityValue,
857    {
858        self.execute_with_trace()
859            .map(PagedLoadExecutionWithTrace::into_execution)
860    }
861
862    /// Execute in cursor-pagination mode and return items, next cursor,
863    /// and optional execution trace details when session debug mode is enabled.
864    ///
865    /// Trace collection is opt-in via `DbSession::debug()` and does not
866    /// change query planning or result semantics.
867    pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
868    where
869        E: EntityValue,
870    {
871        self.inner.ensure_paged_mode_ready()?;
872
873        self.inner.session.execute_load_query_paged_with_trace(
874            self.inner.query(),
875            self.inner.cursor_token.as_deref(),
876        )
877    }
878}