Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10        TraceExecutionStrategy,
11        access::AccessStrategy,
12        cursor::{CursorPlanError, GroupedContinuationToken},
13        diagnostics::ExecutionTrace,
14        executor::{
15            ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16        },
17        query::builder::{
18            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
19        },
20        query::explain::{
21            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
22        },
23        query::intent::{CompiledQuery, PlannedQuery},
24        query::plan::QueryMode,
25        session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
26    },
27    error::InternalError,
28    traits::{CanisterKind, EntityKind, EntityValue, Path},
29};
30
31impl<C: CanisterKind> DbSession<C> {
32    // Compile one typed query using only the indexes currently visible for the
33    // query's recovered store.
34    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
35        &self,
36        query: &Query<E>,
37    ) -> Result<CompiledQuery<E>, QueryError>
38    where
39        E: EntityKind<Canister = C>,
40    {
41        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
42
43        query.plan_with_visible_indexes(&visible_indexes)
44    }
45
46    // Build one logical planned-query shell using only the indexes currently
47    // visible for the query's recovered store.
48    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
49        &self,
50        query: &Query<E>,
51    ) -> Result<PlannedQuery<E>, QueryError>
52    where
53        E: EntityKind<Canister = C>,
54    {
55        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
56
57        query.planned_with_visible_indexes(&visible_indexes)
58    }
59
60    // Project one logical explain payload using only planner-visible indexes.
61    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
62        &self,
63        query: &Query<E>,
64    ) -> Result<ExplainPlan, QueryError>
65    where
66        E: EntityKind<Canister = C>,
67    {
68        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
69
70        query.explain_with_visible_indexes(&visible_indexes)
71    }
72
73    // Hash one typed query plan using only the indexes currently visible for
74    // the query's recovered store.
75    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
76        &self,
77        query: &Query<E>,
78    ) -> Result<String, QueryError>
79    where
80        E: EntityKind<Canister = C>,
81    {
82        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
83
84        query.plan_hash_hex_with_visible_indexes(&visible_indexes)
85    }
86
87    // Explain one load execution shape using only planner-visible
88    // indexes from the recovered store state.
89    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
90        &self,
91        query: &Query<E>,
92    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
93    where
94        E: EntityValue + EntityKind<Canister = C>,
95    {
96        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
97
98        query.explain_execution_with_visible_indexes(&visible_indexes)
99    }
100
101    // Render one load execution descriptor as deterministic text using
102    // only planner-visible indexes from the recovered store state.
103    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
104        &self,
105        query: &Query<E>,
106    ) -> Result<String, QueryError>
107    where
108        E: EntityValue + EntityKind<Canister = C>,
109    {
110        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
111
112        query.explain_execution_text_with_visible_indexes(&visible_indexes)
113    }
114
115    // Render one load execution descriptor as canonical JSON using
116    // only planner-visible indexes from the recovered store state.
117    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
118        &self,
119        query: &Query<E>,
120    ) -> Result<String, QueryError>
121    where
122        E: EntityValue + EntityKind<Canister = C>,
123    {
124        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
125
126        query.explain_execution_json_with_visible_indexes(&visible_indexes)
127    }
128
129    // Render one load execution descriptor plus route diagnostics using
130    // only planner-visible indexes from the recovered store state.
131    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
132        &self,
133        query: &Query<E>,
134    ) -> Result<String, QueryError>
135    where
136        E: EntityValue + EntityKind<Canister = C>,
137    {
138        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
139
140        query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
141    }
142
143    // Explain one prepared fluent aggregate terminal using only
144    // planner-visible indexes from the recovered store state.
145    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
146        &self,
147        query: &Query<E>,
148        strategy: &S,
149    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
150    where
151        E: EntityValue + EntityKind<Canister = C>,
152        S: PreparedFluentAggregateExplainStrategy,
153    {
154        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
155
156        query.explain_prepared_aggregate_terminal_with_visible_indexes(&visible_indexes, strategy)
157    }
158
159    // Explain one `bytes_by(field)` terminal using only planner-visible
160    // indexes from the recovered store state.
161    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
162        &self,
163        query: &Query<E>,
164        target_field: &str,
165    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
166    where
167        E: EntityValue + EntityKind<Canister = C>,
168    {
169        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
170
171        query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
172    }
173
174    // Explain one prepared fluent projection terminal using only
175    // planner-visible indexes from the recovered store state.
176    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
177        &self,
178        query: &Query<E>,
179        strategy: &PreparedFluentProjectionStrategy,
180    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
181    where
182        E: EntityValue + EntityKind<Canister = C>,
183    {
184        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
185
186        query.explain_prepared_projection_terminal_with_visible_indexes(&visible_indexes, strategy)
187    }
188
189    // Validate that one execution strategy is admissible for scalar paged load
190    // execution and fail closed on grouped/primary-key-only routes.
191    fn ensure_scalar_paged_execution_strategy(
192        strategy: ExecutionStrategy,
193    ) -> Result<(), QueryError> {
194        match strategy {
195            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
196                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
197            )),
198            ExecutionStrategy::Ordered => Ok(()),
199            ExecutionStrategy::Grouped => Err(QueryError::invariant(
200                "grouped plans require execute_grouped(...)",
201            )),
202        }
203    }
204
205    // Validate that one execution strategy is admissible for the grouped
206    // execution surface.
207    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
208        match strategy {
209            ExecutionStrategy::Grouped => Ok(()),
210            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
211                QueryError::invariant("execute_grouped requires grouped logical plans"),
212            ),
213        }
214    }
215
216    /// Execute one scalar load/delete query and return materialized response rows.
217    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
218    where
219        E: PersistedRow<Canister = C> + EntityValue,
220    {
221        // Phase 1: compile typed intent into one executable plan contract.
222        let mode = query.mode();
223        let plan = self
224            .compile_query_with_visible_indexes(query)?
225            .into_executable();
226
227        // Phase 2: delegate execution to the shared compiled-plan entry path.
228        self.execute_query_dyn(mode, plan)
229    }
230
231    /// Execute one typed delete query and return only the affected-row count.
232    #[doc(hidden)]
233    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
234    where
235        E: PersistedRow<Canister = C> + EntityValue,
236    {
237        // Phase 1: fail closed if the caller routes a non-delete query here.
238        if !query.mode().is_delete() {
239            return Err(QueryError::unsupported_query(
240                "delete count execution requires delete query mode",
241            ));
242        }
243
244        // Phase 2: compile typed delete intent into one executable plan contract.
245        let plan = self
246            .compile_query_with_visible_indexes(query)?
247            .into_executable();
248
249        // Phase 3: execute the shared delete core while skipping response-row materialization.
250        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
251            .map_err(QueryError::execute)
252    }
253
254    /// Execute one scalar query from one pre-built executable contract.
255    ///
256    /// This is the shared compiled-plan entry boundary used by the typed
257    /// `execute_query(...)` surface and adjacent query execution facades.
258    pub(in crate::db) fn execute_query_dyn<E>(
259        &self,
260        mode: QueryMode,
261        plan: ExecutablePlan<E>,
262    ) -> Result<EntityResponse<E>, QueryError>
263    where
264        E: PersistedRow<Canister = C> + EntityValue,
265    {
266        let result = match mode {
267            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
268            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
269        };
270
271        result.map_err(QueryError::execute)
272    }
273
274    // Shared load-query terminal wrapper: build plan, run under metrics, map
275    // execution errors into query-facing errors.
276    pub(in crate::db) fn execute_load_query_with<E, T>(
277        &self,
278        query: &Query<E>,
279        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
280    ) -> Result<T, QueryError>
281    where
282        E: PersistedRow<Canister = C> + EntityValue,
283    {
284        let plan = self
285            .compile_query_with_visible_indexes(query)?
286            .into_executable();
287
288        self.with_metrics(|| op(self.load_executor::<E>(), plan))
289            .map_err(QueryError::execute)
290    }
291
292    /// Build one trace payload for a query without executing it.
293    ///
294    /// This lightweight surface is intended for developer diagnostics:
295    /// plan hash, access strategy summary, and planner/executor route shape.
296    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
297    where
298        E: EntityKind<Canister = C>,
299    {
300        let compiled = self.compile_query_with_visible_indexes(query)?;
301        let explain = compiled.explain();
302        let plan_hash = compiled.plan_hash_hex();
303
304        let executable = compiled.into_executable();
305        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
306        let execution_strategy = match query.mode() {
307            QueryMode::Load(_) => Some(trace_execution_strategy(
308                executable
309                    .execution_strategy()
310                    .map_err(QueryError::execute)?,
311            )),
312            QueryMode::Delete(_) => None,
313        };
314
315        Ok(QueryTracePlan::new(
316            plan_hash,
317            access_strategy,
318            execution_strategy,
319            explain,
320        ))
321    }
322
323    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
324    pub(crate) fn execute_load_query_paged_with_trace<E>(
325        &self,
326        query: &Query<E>,
327        cursor_token: Option<&str>,
328    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
329    where
330        E: PersistedRow<Canister = C> + EntityValue,
331    {
332        // Phase 1: build/validate executable plan and reject grouped plans.
333        let plan = self
334            .compile_query_with_visible_indexes(query)?
335            .into_executable();
336        Self::ensure_scalar_paged_execution_strategy(
337            plan.execution_strategy().map_err(QueryError::execute)?,
338        )?;
339
340        // Phase 2: decode external cursor token and validate it against plan surface.
341        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
342        let cursor = plan
343            .prepare_cursor(cursor_bytes.as_deref())
344            .map_err(QueryError::from_executor_plan_error)?;
345
346        // Phase 3: execute one traced page and encode outbound continuation token.
347        let (page, trace) = self
348            .with_metrics(|| {
349                self.load_executor::<E>()
350                    .execute_paged_with_cursor_traced(plan, cursor)
351            })
352            .map_err(QueryError::execute)?;
353        let next_cursor = page
354            .next_cursor
355            .map(|token| {
356                let Some(token) = token.as_scalar() else {
357                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
358                };
359
360                token.encode().map_err(|err| {
361                    QueryError::serialize_internal(format!(
362                        "failed to serialize continuation cursor: {err}"
363                    ))
364                })
365            })
366            .transpose()?;
367
368        Ok(PagedLoadExecutionWithTrace::new(
369            page.items,
370            next_cursor,
371            trace,
372        ))
373    }
374
375    /// Execute one grouped query page with optional grouped continuation cursor.
376    ///
377    /// This is the explicit grouped execution boundary; scalar load APIs reject
378    /// grouped plans to preserve scalar response contracts.
379    pub fn execute_grouped<E>(
380        &self,
381        query: &Query<E>,
382        cursor_token: Option<&str>,
383    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
384    where
385        E: PersistedRow<Canister = C> + EntityValue,
386    {
387        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
388        let next_cursor = page
389            .next_cursor
390            .map(|token| {
391                let Some(token) = token.as_grouped() else {
392                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
393                };
394
395                token.encode().map_err(|err| {
396                    QueryError::serialize_internal(format!(
397                        "failed to serialize grouped continuation cursor: {err}"
398                    ))
399                })
400            })
401            .transpose()?;
402
403        Ok(PagedGroupedExecutionWithTrace::new(
404            page.rows,
405            next_cursor,
406            trace,
407        ))
408    }
409
410    /// Execute one grouped query page and return grouped rows plus an already-encoded text cursor.
411    #[doc(hidden)]
412    pub fn execute_grouped_text_cursor<E>(
413        &self,
414        query: &Query<E>,
415        cursor_token: Option<&str>,
416    ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
417    where
418        E: PersistedRow<Canister = C> + EntityValue,
419    {
420        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
421        let next_cursor = page
422            .next_cursor
423            .map(Self::encode_grouped_page_cursor_hex)
424            .transpose()?;
425
426        Ok((page.rows, next_cursor, trace))
427    }
428}
429
430impl<C: CanisterKind> DbSession<C> {
431    // Execute the canonical grouped query core and return the raw grouped page
432    // plus optional execution trace before outward cursor formatting.
433    fn execute_grouped_page_with_trace<E>(
434        &self,
435        query: &Query<E>,
436        cursor_token: Option<&str>,
437    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
438    where
439        E: PersistedRow<Canister = C> + EntityValue,
440    {
441        // Phase 1: build/validate executable plan and require grouped shape.
442        let plan = self
443            .compile_query_with_visible_indexes(query)?
444            .into_executable();
445        Self::ensure_grouped_execution_strategy(
446            plan.execution_strategy().map_err(QueryError::execute)?,
447        )?;
448
449        // Phase 2: decode external grouped cursor token and validate against plan.
450        let cursor = decode_optional_grouped_cursor(cursor_token)?;
451        let cursor = plan
452            .prepare_grouped_cursor_token(cursor)
453            .map_err(QueryError::from_executor_plan_error)?;
454
455        // Phase 3: execute one grouped page while preserving the structural
456        // grouped cursor payload for whichever outward cursor format the caller needs.
457        self.with_metrics(|| {
458            self.load_executor::<E>()
459                .execute_grouped_paged_with_cursor_traced(plan, cursor)
460        })
461        .map_err(QueryError::execute)
462    }
463
464    // Encode one grouped page cursor directly to lowercase hex without
465    // round-tripping through a temporary raw cursor byte vector.
466    fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
467        let token: &GroupedContinuationToken = page_cursor
468            .as_grouped()
469            .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
470
471        token.encode_hex().map_err(|err| {
472            QueryError::serialize_internal(format!(
473                "failed to serialize grouped continuation cursor: {err}"
474            ))
475        })
476    }
477}
478
479const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
480    match strategy {
481        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
482        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
483        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
484    }
485}