Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10        TraceExecutionStrategy,
11        access::AccessStrategy,
12        cursor::{CursorPlanError, GroupedContinuationToken},
13        diagnostics::ExecutionTrace,
14        executor::{
15            ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16        },
17        query::builder::aggregate::{
18            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentScalarTerminalStrategy,
19        },
20        query::explain::{
21            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
22        },
23        query::intent::{CompiledQuery, PlannedQuery},
24        query::plan::QueryMode,
25        session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
26    },
27    error::InternalError,
28    traits::{CanisterKind, EntityKind, EntityValue, Path},
29};
30
31#[cfg(test)]
32use crate::db::query::builder::aggregate::PreparedFluentNumericFieldStrategy;
33
34impl<C: CanisterKind> DbSession<C> {
35    // Compile one typed query using only the indexes currently visible for the
36    // query's recovered store.
37    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
38        &self,
39        query: &Query<E>,
40    ) -> Result<CompiledQuery<E>, QueryError>
41    where
42        E: EntityKind<Canister = C>,
43    {
44        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
45
46        query.plan_with_visible_indexes(&visible_indexes)
47    }
48
49    // Build one logical planned-query shell using only the indexes currently
50    // visible for the query's recovered store.
51    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
52        &self,
53        query: &Query<E>,
54    ) -> Result<PlannedQuery<E>, QueryError>
55    where
56        E: EntityKind<Canister = C>,
57    {
58        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
59
60        query.planned_with_visible_indexes(&visible_indexes)
61    }
62
63    // Project one logical explain payload using only planner-visible indexes.
64    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
65        &self,
66        query: &Query<E>,
67    ) -> Result<ExplainPlan, QueryError>
68    where
69        E: EntityKind<Canister = C>,
70    {
71        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
72
73        query.explain_with_visible_indexes(&visible_indexes)
74    }
75
76    // Hash one typed query plan using only the indexes currently visible for
77    // the query's recovered store.
78    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
79        &self,
80        query: &Query<E>,
81    ) -> Result<String, QueryError>
82    where
83        E: EntityKind<Canister = C>,
84    {
85        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
86
87        query.plan_hash_hex_with_visible_indexes(&visible_indexes)
88    }
89
90    // Explain one scalar load execution shape using only planner-visible
91    // indexes from the recovered store state.
92    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
93        &self,
94        query: &Query<E>,
95    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
96    where
97        E: EntityValue + EntityKind<Canister = C>,
98    {
99        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
100
101        query.explain_execution_with_visible_indexes(&visible_indexes)
102    }
103
104    // Render one scalar load execution descriptor as deterministic text using
105    // only planner-visible indexes from the recovered store state.
106    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
107        &self,
108        query: &Query<E>,
109    ) -> Result<String, QueryError>
110    where
111        E: EntityValue + EntityKind<Canister = C>,
112    {
113        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
114
115        query.explain_execution_text_with_visible_indexes(&visible_indexes)
116    }
117
118    // Render one scalar load execution descriptor as canonical JSON using
119    // only planner-visible indexes from the recovered store state.
120    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
121        &self,
122        query: &Query<E>,
123    ) -> Result<String, QueryError>
124    where
125        E: EntityValue + EntityKind<Canister = C>,
126    {
127        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
128
129        query.explain_execution_json_with_visible_indexes(&visible_indexes)
130    }
131
132    // Render one scalar load execution descriptor plus route diagnostics using
133    // only planner-visible indexes from the recovered store state.
134    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
135        &self,
136        query: &Query<E>,
137    ) -> Result<String, QueryError>
138    where
139        E: EntityValue + EntityKind<Canister = C>,
140    {
141        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
142
143        query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
144    }
145
146    // Explain one prepared fluent scalar terminal using only planner-visible
147    // indexes from the recovered store state.
148    pub(in crate::db) fn explain_query_prepared_scalar_terminal_with_visible_indexes<E>(
149        &self,
150        query: &Query<E>,
151        strategy: &PreparedFluentScalarTerminalStrategy,
152    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
153    where
154        E: EntityValue + EntityKind<Canister = C>,
155    {
156        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
157
158        query.explain_prepared_scalar_terminal_with_visible_indexes(&visible_indexes, strategy)
159    }
160
161    // Explain one prepared fluent order-sensitive terminal using only
162    // planner-visible indexes from the recovered store state.
163    pub(in crate::db) fn explain_query_prepared_order_sensitive_terminal_with_visible_indexes<E>(
164        &self,
165        query: &Query<E>,
166        strategy: &PreparedFluentOrderSensitiveTerminalStrategy,
167    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
168    where
169        E: EntityValue + EntityKind<Canister = C>,
170    {
171        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
172
173        query.explain_prepared_order_sensitive_terminal_with_visible_indexes(
174            &visible_indexes,
175            strategy,
176        )
177    }
178
179    // Explain one prepared fluent numeric-field terminal using only
180    // planner-visible indexes from the recovered store state.
181    #[cfg(test)]
182    pub(in crate::db) fn explain_query_prepared_numeric_field_with_visible_indexes<E>(
183        &self,
184        query: &Query<E>,
185        strategy: &PreparedFluentNumericFieldStrategy,
186    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
187    where
188        E: EntityValue + EntityKind<Canister = C>,
189    {
190        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
191
192        query.explain_prepared_numeric_field_with_visible_indexes(&visible_indexes, strategy)
193    }
194
195    // Explain one `bytes_by(field)` terminal using only planner-visible
196    // indexes from the recovered store state.
197    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
198        &self,
199        query: &Query<E>,
200        target_field: &str,
201    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
202    where
203        E: EntityValue + EntityKind<Canister = C>,
204    {
205        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
206
207        query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
208    }
209
210    // Validate that one execution strategy is admissible for scalar paged load
211    // execution and fail closed on grouped/primary-key-only routes.
212    fn ensure_scalar_paged_execution_strategy(
213        strategy: ExecutionStrategy,
214    ) -> Result<(), QueryError> {
215        match strategy {
216            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
217                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
218            )),
219            ExecutionStrategy::Ordered => Ok(()),
220            ExecutionStrategy::Grouped => Err(QueryError::invariant(
221                "grouped plans require execute_grouped(...)",
222            )),
223        }
224    }
225
226    // Validate that one execution strategy is admissible for the grouped
227    // execution surface.
228    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
229        match strategy {
230            ExecutionStrategy::Grouped => Ok(()),
231            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
232                QueryError::invariant("execute_grouped requires grouped logical plans"),
233            ),
234        }
235    }
236
237    /// Execute one scalar load/delete query and return materialized response rows.
238    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
239    where
240        E: PersistedRow<Canister = C> + EntityValue,
241    {
242        // Phase 1: compile typed intent into one executable plan contract.
243        let mode = query.mode();
244        let plan = self
245            .compile_query_with_visible_indexes(query)?
246            .into_executable();
247
248        // Phase 2: delegate execution to the shared compiled-plan entry path.
249        self.execute_query_dyn(mode, plan)
250    }
251
252    /// Execute one typed delete query and return only the affected-row count.
253    #[doc(hidden)]
254    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
255    where
256        E: PersistedRow<Canister = C> + EntityValue,
257    {
258        // Phase 1: fail closed if the caller routes a non-delete query here.
259        if !query.mode().is_delete() {
260            return Err(QueryError::unsupported_query(
261                "delete count execution requires delete query mode",
262            ));
263        }
264
265        // Phase 2: compile typed delete intent into one executable plan contract.
266        let plan = self
267            .compile_query_with_visible_indexes(query)?
268            .into_executable();
269
270        // Phase 3: execute the shared delete core while skipping response-row materialization.
271        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
272            .map_err(QueryError::execute)
273    }
274
275    /// Execute one scalar query from one pre-built executable contract.
276    ///
277    /// This is the shared compiled-plan entry boundary used by the typed
278    /// `execute_query(...)` surface and adjacent query execution facades.
279    pub(in crate::db) fn execute_query_dyn<E>(
280        &self,
281        mode: QueryMode,
282        plan: ExecutablePlan<E>,
283    ) -> Result<EntityResponse<E>, QueryError>
284    where
285        E: PersistedRow<Canister = C> + EntityValue,
286    {
287        let result = match mode {
288            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
289            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
290        };
291
292        result.map_err(QueryError::execute)
293    }
294
295    // Shared load-query terminal wrapper: build plan, run under metrics, map
296    // execution errors into query-facing errors.
297    pub(in crate::db) fn execute_load_query_with<E, T>(
298        &self,
299        query: &Query<E>,
300        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
301    ) -> Result<T, QueryError>
302    where
303        E: PersistedRow<Canister = C> + EntityValue,
304    {
305        let plan = self
306            .compile_query_with_visible_indexes(query)?
307            .into_executable();
308
309        self.with_metrics(|| op(self.load_executor::<E>(), plan))
310            .map_err(QueryError::execute)
311    }
312
313    /// Build one trace payload for a query without executing it.
314    ///
315    /// This lightweight surface is intended for developer diagnostics:
316    /// plan hash, access strategy summary, and planner/executor route shape.
317    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
318    where
319        E: EntityKind<Canister = C>,
320    {
321        let compiled = self.compile_query_with_visible_indexes(query)?;
322        let explain = compiled.explain();
323        let plan_hash = compiled.plan_hash_hex();
324
325        let executable = compiled.into_executable();
326        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
327        let execution_strategy = match query.mode() {
328            QueryMode::Load(_) => Some(trace_execution_strategy(
329                executable
330                    .execution_strategy()
331                    .map_err(QueryError::execute)?,
332            )),
333            QueryMode::Delete(_) => None,
334        };
335
336        Ok(QueryTracePlan::new(
337            plan_hash,
338            access_strategy,
339            execution_strategy,
340            explain,
341        ))
342    }
343
344    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
345    pub(crate) fn execute_load_query_paged_with_trace<E>(
346        &self,
347        query: &Query<E>,
348        cursor_token: Option<&str>,
349    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
350    where
351        E: PersistedRow<Canister = C> + EntityValue,
352    {
353        // Phase 1: build/validate executable plan and reject grouped plans.
354        let plan = self
355            .compile_query_with_visible_indexes(query)?
356            .into_executable();
357        Self::ensure_scalar_paged_execution_strategy(
358            plan.execution_strategy().map_err(QueryError::execute)?,
359        )?;
360
361        // Phase 2: decode external cursor token and validate it against plan surface.
362        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
363        let cursor = plan
364            .prepare_cursor(cursor_bytes.as_deref())
365            .map_err(QueryError::from_executor_plan_error)?;
366
367        // Phase 3: execute one traced page and encode outbound continuation token.
368        let (page, trace) = self
369            .with_metrics(|| {
370                self.load_executor::<E>()
371                    .execute_paged_with_cursor_traced(plan, cursor)
372            })
373            .map_err(QueryError::execute)?;
374        let next_cursor = page
375            .next_cursor
376            .map(|token| {
377                let Some(token) = token.as_scalar() else {
378                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
379                };
380
381                token.encode().map_err(|err| {
382                    QueryError::serialize_internal(format!(
383                        "failed to serialize continuation cursor: {err}"
384                    ))
385                })
386            })
387            .transpose()?;
388
389        Ok(PagedLoadExecutionWithTrace::new(
390            page.items,
391            next_cursor,
392            trace,
393        ))
394    }
395
396    /// Execute one grouped query page with optional grouped continuation cursor.
397    ///
398    /// This is the explicit grouped execution boundary; scalar load APIs reject
399    /// grouped plans to preserve scalar response contracts.
400    pub fn execute_grouped<E>(
401        &self,
402        query: &Query<E>,
403        cursor_token: Option<&str>,
404    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
405    where
406        E: PersistedRow<Canister = C> + EntityValue,
407    {
408        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
409        let next_cursor = page
410            .next_cursor
411            .map(|token| {
412                let Some(token) = token.as_grouped() else {
413                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
414                };
415
416                token.encode().map_err(|err| {
417                    QueryError::serialize_internal(format!(
418                        "failed to serialize grouped continuation cursor: {err}"
419                    ))
420                })
421            })
422            .transpose()?;
423
424        Ok(PagedGroupedExecutionWithTrace::new(
425            page.rows,
426            next_cursor,
427            trace,
428        ))
429    }
430
431    /// Execute one grouped query page and return grouped rows plus an already-encoded text cursor.
432    #[doc(hidden)]
433    pub fn execute_grouped_text_cursor<E>(
434        &self,
435        query: &Query<E>,
436        cursor_token: Option<&str>,
437    ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
438    where
439        E: PersistedRow<Canister = C> + EntityValue,
440    {
441        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
442        let next_cursor = page
443            .next_cursor
444            .map(Self::encode_grouped_page_cursor_hex)
445            .transpose()?;
446
447        Ok((page.rows, next_cursor, trace))
448    }
449}
450
451impl<C: CanisterKind> DbSession<C> {
452    // Execute the canonical grouped query core and return the raw grouped page
453    // plus optional execution trace before outward cursor formatting.
454    fn execute_grouped_page_with_trace<E>(
455        &self,
456        query: &Query<E>,
457        cursor_token: Option<&str>,
458    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
459    where
460        E: PersistedRow<Canister = C> + EntityValue,
461    {
462        // Phase 1: build/validate executable plan and require grouped shape.
463        let plan = self
464            .compile_query_with_visible_indexes(query)?
465            .into_executable();
466        Self::ensure_grouped_execution_strategy(
467            plan.execution_strategy().map_err(QueryError::execute)?,
468        )?;
469
470        // Phase 2: decode external grouped cursor token and validate against plan.
471        let cursor = decode_optional_grouped_cursor(cursor_token)?;
472        let cursor = plan
473            .prepare_grouped_cursor_token(cursor)
474            .map_err(QueryError::from_executor_plan_error)?;
475
476        // Phase 3: execute one grouped page while preserving the structural
477        // grouped cursor payload for whichever outward cursor format the caller needs.
478        self.with_metrics(|| {
479            self.load_executor::<E>()
480                .execute_grouped_paged_with_cursor_traced(plan, cursor)
481        })
482        .map_err(QueryError::execute)
483    }
484
485    // Encode one grouped page cursor directly to lowercase hex without
486    // round-tripping through a temporary raw cursor byte vector.
487    fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
488        let token: &GroupedContinuationToken = page_cursor
489            .as_grouped()
490            .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
491
492        token.encode_hex().map_err(|err| {
493            QueryError::serialize_internal(format!(
494                "failed to serialize grouped continuation cursor: {err}"
495            ))
496        })
497    }
498}
499
500const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
501    match strategy {
502        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
503        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
504        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
505    }
506}