Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10        TraceExecutionStrategy,
11        access::AccessStrategy,
12        cursor::{CursorPlanError, GroupedContinuationToken},
13        diagnostics::ExecutionTrace,
14        executor::{
15            ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16        },
17        query::builder::aggregate::AggregateExpr,
18        query::explain::{
19            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
20        },
21        query::intent::{CompiledQuery, PlannedQuery},
22        query::plan::QueryMode,
23        session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
24    },
25    error::InternalError,
26    traits::{CanisterKind, EntityKind, EntityValue, Path},
27};
28
29impl<C: CanisterKind> DbSession<C> {
30    // Compile one typed query using only the indexes currently visible for the
31    // query's recovered store.
32    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
33        &self,
34        query: &Query<E>,
35    ) -> Result<CompiledQuery<E>, QueryError>
36    where
37        E: EntityKind<Canister = C>,
38    {
39        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
40
41        query.plan_with_visible_indexes(&visible_indexes)
42    }
43
44    // Build one logical planned-query shell using only the indexes currently
45    // visible for the query's recovered store.
46    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
47        &self,
48        query: &Query<E>,
49    ) -> Result<PlannedQuery<E>, QueryError>
50    where
51        E: EntityKind<Canister = C>,
52    {
53        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
54
55        query.planned_with_visible_indexes(&visible_indexes)
56    }
57
58    // Project one logical explain payload using only planner-visible indexes.
59    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
60        &self,
61        query: &Query<E>,
62    ) -> Result<ExplainPlan, QueryError>
63    where
64        E: EntityKind<Canister = C>,
65    {
66        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
67
68        query.explain_with_visible_indexes(&visible_indexes)
69    }
70
71    // Hash one typed query plan using only the indexes currently visible for
72    // the query's recovered store.
73    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
74        &self,
75        query: &Query<E>,
76    ) -> Result<String, QueryError>
77    where
78        E: EntityKind<Canister = C>,
79    {
80        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
81
82        query.plan_hash_hex_with_visible_indexes(&visible_indexes)
83    }
84
85    // Explain one scalar load execution shape using only planner-visible
86    // indexes from the recovered store state.
87    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
88        &self,
89        query: &Query<E>,
90    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
91    where
92        E: EntityValue + EntityKind<Canister = C>,
93    {
94        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
95
96        query.explain_execution_with_visible_indexes(&visible_indexes)
97    }
98
99    // Render one scalar load execution descriptor as deterministic text using
100    // only planner-visible indexes from the recovered store state.
101    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
102        &self,
103        query: &Query<E>,
104    ) -> Result<String, QueryError>
105    where
106        E: EntityValue + EntityKind<Canister = C>,
107    {
108        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
109
110        query.explain_execution_text_with_visible_indexes(&visible_indexes)
111    }
112
113    // Render one scalar load execution descriptor as canonical JSON using
114    // only planner-visible indexes from the recovered store state.
115    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
116        &self,
117        query: &Query<E>,
118    ) -> Result<String, QueryError>
119    where
120        E: EntityValue + EntityKind<Canister = C>,
121    {
122        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
123
124        query.explain_execution_json_with_visible_indexes(&visible_indexes)
125    }
126
127    // Render one scalar load execution descriptor plus route diagnostics using
128    // only planner-visible indexes from the recovered store state.
129    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
130        &self,
131        query: &Query<E>,
132    ) -> Result<String, QueryError>
133    where
134        E: EntityValue + EntityKind<Canister = C>,
135    {
136        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
137
138        query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
139    }
140
141    // Explain one scalar aggregate terminal using only planner-visible indexes
142    // from the recovered store state.
143    pub(in crate::db) fn explain_query_aggregate_terminal_with_visible_indexes<E>(
144        &self,
145        query: &Query<E>,
146        aggregate: AggregateExpr,
147    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
148    where
149        E: EntityValue + EntityKind<Canister = C>,
150    {
151        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
152
153        query.explain_aggregate_terminal_with_visible_indexes(&visible_indexes, aggregate)
154    }
155
156    // Explain one `bytes_by(field)` terminal using only planner-visible
157    // indexes from the recovered store state.
158    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
159        &self,
160        query: &Query<E>,
161        target_field: &str,
162    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
163    where
164        E: EntityValue + EntityKind<Canister = C>,
165    {
166        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
167
168        query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
169    }
170
171    // Validate that one execution strategy is admissible for scalar paged load
172    // execution and fail closed on grouped/primary-key-only routes.
173    fn ensure_scalar_paged_execution_strategy(
174        strategy: ExecutionStrategy,
175    ) -> Result<(), QueryError> {
176        match strategy {
177            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
178                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
179            )),
180            ExecutionStrategy::Ordered => Ok(()),
181            ExecutionStrategy::Grouped => Err(QueryError::invariant(
182                "grouped plans require execute_grouped(...)",
183            )),
184        }
185    }
186
187    // Validate that one execution strategy is admissible for the grouped
188    // execution surface.
189    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
190        match strategy {
191            ExecutionStrategy::Grouped => Ok(()),
192            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
193                QueryError::invariant("execute_grouped requires grouped logical plans"),
194            ),
195        }
196    }
197
198    /// Execute one scalar load/delete query and return materialized response rows.
199    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
200    where
201        E: PersistedRow<Canister = C> + EntityValue,
202    {
203        // Phase 1: compile typed intent into one executable plan contract.
204        let mode = query.mode();
205        let plan = self
206            .compile_query_with_visible_indexes(query)?
207            .into_executable();
208
209        // Phase 2: delegate execution to the shared compiled-plan entry path.
210        self.execute_query_dyn(mode, plan)
211    }
212
213    /// Execute one typed delete query and return only the affected-row count.
214    #[doc(hidden)]
215    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
216    where
217        E: PersistedRow<Canister = C> + EntityValue,
218    {
219        // Phase 1: fail closed if the caller routes a non-delete query here.
220        if !query.mode().is_delete() {
221            return Err(QueryError::unsupported_query(
222                "delete count execution requires delete query mode",
223            ));
224        }
225
226        // Phase 2: compile typed delete intent into one executable plan contract.
227        let plan = self
228            .compile_query_with_visible_indexes(query)?
229            .into_executable();
230
231        // Phase 3: execute the shared delete core while skipping response-row materialization.
232        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
233            .map_err(QueryError::execute)
234    }
235
236    /// Execute one scalar query from one pre-built executable contract.
237    ///
238    /// This is the shared compiled-plan entry boundary used by the typed
239    /// `execute_query(...)` surface and adjacent query execution facades.
240    pub(in crate::db) fn execute_query_dyn<E>(
241        &self,
242        mode: QueryMode,
243        plan: ExecutablePlan<E>,
244    ) -> Result<EntityResponse<E>, QueryError>
245    where
246        E: PersistedRow<Canister = C> + EntityValue,
247    {
248        let result = match mode {
249            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
250            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
251        };
252
253        result.map_err(QueryError::execute)
254    }
255
256    // Shared load-query terminal wrapper: build plan, run under metrics, map
257    // execution errors into query-facing errors.
258    pub(in crate::db) fn execute_load_query_with<E, T>(
259        &self,
260        query: &Query<E>,
261        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
262    ) -> Result<T, QueryError>
263    where
264        E: PersistedRow<Canister = C> + EntityValue,
265    {
266        let plan = self
267            .compile_query_with_visible_indexes(query)?
268            .into_executable();
269
270        self.with_metrics(|| op(self.load_executor::<E>(), plan))
271            .map_err(QueryError::execute)
272    }
273
274    /// Build one trace payload for a query without executing it.
275    ///
276    /// This lightweight surface is intended for developer diagnostics:
277    /// plan hash, access strategy summary, and planner/executor route shape.
278    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
279    where
280        E: EntityKind<Canister = C>,
281    {
282        let compiled = self.compile_query_with_visible_indexes(query)?;
283        let explain = compiled.explain();
284        let plan_hash = compiled.plan_hash_hex();
285
286        let executable = compiled.into_executable();
287        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
288        let execution_strategy = match query.mode() {
289            QueryMode::Load(_) => Some(trace_execution_strategy(
290                executable
291                    .execution_strategy()
292                    .map_err(QueryError::execute)?,
293            )),
294            QueryMode::Delete(_) => None,
295        };
296
297        Ok(QueryTracePlan::new(
298            plan_hash,
299            access_strategy,
300            execution_strategy,
301            explain,
302        ))
303    }
304
305    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
306    pub(crate) fn execute_load_query_paged_with_trace<E>(
307        &self,
308        query: &Query<E>,
309        cursor_token: Option<&str>,
310    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
311    where
312        E: PersistedRow<Canister = C> + EntityValue,
313    {
314        // Phase 1: build/validate executable plan and reject grouped plans.
315        let plan = self
316            .compile_query_with_visible_indexes(query)?
317            .into_executable();
318        Self::ensure_scalar_paged_execution_strategy(
319            plan.execution_strategy().map_err(QueryError::execute)?,
320        )?;
321
322        // Phase 2: decode external cursor token and validate it against plan surface.
323        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
324        let cursor = plan
325            .prepare_cursor(cursor_bytes.as_deref())
326            .map_err(QueryError::from_executor_plan_error)?;
327
328        // Phase 3: execute one traced page and encode outbound continuation token.
329        let (page, trace) = self
330            .with_metrics(|| {
331                self.load_executor::<E>()
332                    .execute_paged_with_cursor_traced(plan, cursor)
333            })
334            .map_err(QueryError::execute)?;
335        let next_cursor = page
336            .next_cursor
337            .map(|token| {
338                let Some(token) = token.as_scalar() else {
339                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
340                };
341
342                token.encode().map_err(|err| {
343                    QueryError::serialize_internal(format!(
344                        "failed to serialize continuation cursor: {err}"
345                    ))
346                })
347            })
348            .transpose()?;
349
350        Ok(PagedLoadExecutionWithTrace::new(
351            page.items,
352            next_cursor,
353            trace,
354        ))
355    }
356
357    /// Execute one grouped query page with optional grouped continuation cursor.
358    ///
359    /// This is the explicit grouped execution boundary; scalar load APIs reject
360    /// grouped plans to preserve scalar response contracts.
361    pub fn execute_grouped<E>(
362        &self,
363        query: &Query<E>,
364        cursor_token: Option<&str>,
365    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
366    where
367        E: PersistedRow<Canister = C> + EntityValue,
368    {
369        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
370        let next_cursor = page
371            .next_cursor
372            .map(|token| {
373                let Some(token) = token.as_grouped() else {
374                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
375                };
376
377                token.encode().map_err(|err| {
378                    QueryError::serialize_internal(format!(
379                        "failed to serialize grouped continuation cursor: {err}"
380                    ))
381                })
382            })
383            .transpose()?;
384
385        Ok(PagedGroupedExecutionWithTrace::new(
386            page.rows,
387            next_cursor,
388            trace,
389        ))
390    }
391
392    /// Execute one grouped query page and return grouped rows plus an already-encoded text cursor.
393    #[doc(hidden)]
394    pub fn execute_grouped_text_cursor<E>(
395        &self,
396        query: &Query<E>,
397        cursor_token: Option<&str>,
398    ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
399    where
400        E: PersistedRow<Canister = C> + EntityValue,
401    {
402        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
403        let next_cursor = page
404            .next_cursor
405            .map(Self::encode_grouped_page_cursor_hex)
406            .transpose()?;
407
408        Ok((page.rows, next_cursor, trace))
409    }
410}
411
412impl<C: CanisterKind> DbSession<C> {
413    // Execute the canonical grouped query core and return the raw grouped page
414    // plus optional execution trace before outward cursor formatting.
415    fn execute_grouped_page_with_trace<E>(
416        &self,
417        query: &Query<E>,
418        cursor_token: Option<&str>,
419    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
420    where
421        E: PersistedRow<Canister = C> + EntityValue,
422    {
423        // Phase 1: build/validate executable plan and require grouped shape.
424        let plan = self
425            .compile_query_with_visible_indexes(query)?
426            .into_executable();
427        Self::ensure_grouped_execution_strategy(
428            plan.execution_strategy().map_err(QueryError::execute)?,
429        )?;
430
431        // Phase 2: decode external grouped cursor token and validate against plan.
432        let cursor = decode_optional_grouped_cursor(cursor_token)?;
433        let cursor = plan
434            .prepare_grouped_cursor_token(cursor)
435            .map_err(QueryError::from_executor_plan_error)?;
436
437        // Phase 3: execute one grouped page while preserving the structural
438        // grouped cursor payload for whichever outward cursor format the caller needs.
439        self.with_metrics(|| {
440            self.load_executor::<E>()
441                .execute_grouped_paged_with_cursor_traced(plan, cursor)
442        })
443        .map_err(QueryError::execute)
444    }
445
446    // Encode one grouped page cursor directly to lowercase hex without
447    // round-tripping through a temporary raw cursor byte vector.
448    fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
449        let token: &GroupedContinuationToken = page_cursor
450            .as_grouped()
451            .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
452
453        token.encode_hex().map_err(|err| {
454            QueryError::serialize_internal(format!(
455                "failed to serialize grouped continuation cursor: {err}"
456            ))
457        })
458    }
459}
460
461const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
462    match strategy {
463        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
464        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
465        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
466    }
467}