Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10        access::AccessStrategy,
11        cursor::{
12            CursorPlanError, GroupedContinuationToken, decode_optional_cursor_token,
13            decode_optional_grouped_cursor_token,
14        },
15        diagnostics::ExecutionTrace,
16        executor::{
17            ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
18        },
19        query::builder::{
20            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
21        },
22        query::explain::{
23            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
24        },
25        query::intent::{CompiledQuery, PlannedQuery},
26        query::plan::QueryMode,
27    },
28    error::InternalError,
29    traits::{CanisterKind, EntityKind, EntityValue, Path},
30};
31
32impl<C: CanisterKind> DbSession<C> {
33    // Compile one typed query using only the indexes currently visible for the
34    // query's recovered store.
35    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
36        &self,
37        query: &Query<E>,
38    ) -> Result<CompiledQuery<E>, QueryError>
39    where
40        E: EntityKind<Canister = C>,
41    {
42        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
43
44        query.plan_with_visible_indexes(&visible_indexes)
45    }
46
47    // Build one logical planned-query shell using only the indexes currently
48    // visible for the query's recovered store.
49    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
50        &self,
51        query: &Query<E>,
52    ) -> Result<PlannedQuery<E>, QueryError>
53    where
54        E: EntityKind<Canister = C>,
55    {
56        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
57
58        query.planned_with_visible_indexes(&visible_indexes)
59    }
60
61    // Project one logical explain payload using only planner-visible indexes.
62    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
63        &self,
64        query: &Query<E>,
65    ) -> Result<ExplainPlan, QueryError>
66    where
67        E: EntityKind<Canister = C>,
68    {
69        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
70
71        query.explain_with_visible_indexes(&visible_indexes)
72    }
73
74    // Hash one typed query plan using only the indexes currently visible for
75    // the query's recovered store.
76    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
77        &self,
78        query: &Query<E>,
79    ) -> Result<String, QueryError>
80    where
81        E: EntityKind<Canister = C>,
82    {
83        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
84
85        query.plan_hash_hex_with_visible_indexes(&visible_indexes)
86    }
87
88    // Explain one load execution shape using only planner-visible
89    // indexes from the recovered store state.
90    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
91        &self,
92        query: &Query<E>,
93    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
94    where
95        E: EntityValue + EntityKind<Canister = C>,
96    {
97        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
98
99        query.explain_execution_with_visible_indexes(&visible_indexes)
100    }
101
102    // Render one load execution descriptor as deterministic text using
103    // only planner-visible indexes from the recovered store state.
104    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
105        &self,
106        query: &Query<E>,
107    ) -> Result<String, QueryError>
108    where
109        E: EntityValue + EntityKind<Canister = C>,
110    {
111        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
112
113        query.explain_execution_text_with_visible_indexes(&visible_indexes)
114    }
115
116    // Render one load execution descriptor as canonical JSON using
117    // only planner-visible indexes from the recovered store state.
118    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
119        &self,
120        query: &Query<E>,
121    ) -> Result<String, QueryError>
122    where
123        E: EntityValue + EntityKind<Canister = C>,
124    {
125        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
126
127        query.explain_execution_json_with_visible_indexes(&visible_indexes)
128    }
129
130    // Render one load execution descriptor plus route diagnostics using
131    // only planner-visible indexes from the recovered store state.
132    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
133        &self,
134        query: &Query<E>,
135    ) -> Result<String, QueryError>
136    where
137        E: EntityValue + EntityKind<Canister = C>,
138    {
139        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
140
141        query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
142    }
143
144    // Explain one prepared fluent aggregate terminal using only
145    // planner-visible indexes from the recovered store state.
146    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
147        &self,
148        query: &Query<E>,
149        strategy: &S,
150    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
151    where
152        E: EntityValue + EntityKind<Canister = C>,
153        S: PreparedFluentAggregateExplainStrategy,
154    {
155        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
156
157        query.explain_prepared_aggregate_terminal_with_visible_indexes(&visible_indexes, strategy)
158    }
159
160    // Explain one `bytes_by(field)` terminal using only planner-visible
161    // indexes from the recovered store state.
162    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
163        &self,
164        query: &Query<E>,
165        target_field: &str,
166    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
167    where
168        E: EntityValue + EntityKind<Canister = C>,
169    {
170        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
171
172        query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
173    }
174
175    // Explain one prepared fluent projection terminal using only
176    // planner-visible indexes from the recovered store state.
177    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
178        &self,
179        query: &Query<E>,
180        strategy: &PreparedFluentProjectionStrategy,
181    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
182    where
183        E: EntityValue + EntityKind<Canister = C>,
184    {
185        let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
186
187        query.explain_prepared_projection_terminal_with_visible_indexes(&visible_indexes, strategy)
188    }
189
190    // Validate that one execution strategy is admissible for scalar paged load
191    // execution and fail closed on grouped/primary-key-only routes.
192    fn ensure_scalar_paged_execution_strategy(
193        strategy: ExecutionStrategy,
194    ) -> Result<(), QueryError> {
195        match strategy {
196            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
197                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
198            )),
199            ExecutionStrategy::Ordered => Ok(()),
200            ExecutionStrategy::Grouped => Err(QueryError::invariant(
201                "grouped plans require execute_grouped(...)",
202            )),
203        }
204    }
205
206    // Validate that one execution strategy is admissible for the grouped
207    // execution surface.
208    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
209        match strategy {
210            ExecutionStrategy::Grouped => Ok(()),
211            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
212                QueryError::invariant("execute_grouped requires grouped logical plans"),
213            ),
214        }
215    }
216
217    /// Execute one scalar load/delete query and return materialized response rows.
218    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
219    where
220        E: PersistedRow<Canister = C> + EntityValue,
221    {
222        // Phase 1: compile typed intent into one executable plan contract.
223        let mode = query.mode();
224        let plan = self
225            .compile_query_with_visible_indexes(query)?
226            .into_executable();
227
228        // Phase 2: delegate execution to the shared compiled-plan entry path.
229        self.execute_query_dyn(mode, plan)
230    }
231
232    /// Execute one typed delete query and return only the affected-row count.
233    #[doc(hidden)]
234    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
235    where
236        E: PersistedRow<Canister = C> + EntityValue,
237    {
238        // Phase 1: fail closed if the caller routes a non-delete query here.
239        if !query.mode().is_delete() {
240            return Err(QueryError::unsupported_query(
241                "delete count execution requires delete query mode",
242            ));
243        }
244
245        // Phase 2: compile typed delete intent into one executable plan contract.
246        let plan = self
247            .compile_query_with_visible_indexes(query)?
248            .into_executable();
249
250        // Phase 3: execute the shared delete core while skipping response-row materialization.
251        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
252            .map_err(QueryError::execute)
253    }
254
255    /// Execute one scalar query from one pre-built executable contract.
256    ///
257    /// This is the shared compiled-plan entry boundary used by the typed
258    /// `execute_query(...)` surface and adjacent query execution facades.
259    pub(in crate::db) fn execute_query_dyn<E>(
260        &self,
261        mode: QueryMode,
262        plan: ExecutablePlan<E>,
263    ) -> Result<EntityResponse<E>, QueryError>
264    where
265        E: PersistedRow<Canister = C> + EntityValue,
266    {
267        let result = match mode {
268            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
269            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
270        };
271
272        result.map_err(QueryError::execute)
273    }
274
275    // Shared load-query terminal wrapper: build plan, run under metrics, map
276    // execution errors into query-facing errors.
277    pub(in crate::db) fn execute_load_query_with<E, T>(
278        &self,
279        query: &Query<E>,
280        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
281    ) -> Result<T, QueryError>
282    where
283        E: PersistedRow<Canister = C> + EntityValue,
284    {
285        let plan = self
286            .compile_query_with_visible_indexes(query)?
287            .into_executable();
288
289        self.with_metrics(|| op(self.load_executor::<E>(), plan))
290            .map_err(QueryError::execute)
291    }
292
293    /// Build one trace payload for a query without executing it.
294    ///
295    /// This lightweight surface is intended for developer diagnostics:
296    /// plan hash, access strategy summary, and planner/executor route shape.
297    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
298    where
299        E: EntityKind<Canister = C>,
300    {
301        let compiled = self.compile_query_with_visible_indexes(query)?;
302        let explain = compiled.explain();
303        let plan_hash = compiled.plan_hash_hex();
304
305        let executable = compiled.into_executable();
306        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
307        let execution_strategy = match query.mode() {
308            QueryMode::Load(_) => Some(
309                executable
310                    .execution_strategy()
311                    .map_err(QueryError::execute)?,
312            ),
313            QueryMode::Delete(_) => None,
314        };
315
316        Ok(QueryTracePlan::new(
317            plan_hash,
318            access_strategy,
319            execution_strategy,
320            explain,
321        ))
322    }
323
324    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
325    pub(crate) fn execute_load_query_paged_with_trace<E>(
326        &self,
327        query: &Query<E>,
328        cursor_token: Option<&str>,
329    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
330    where
331        E: PersistedRow<Canister = C> + EntityValue,
332    {
333        // Phase 1: build/validate executable plan and reject grouped plans.
334        let plan = self
335            .compile_query_with_visible_indexes(query)?
336            .into_executable();
337        Self::ensure_scalar_paged_execution_strategy(
338            plan.execution_strategy().map_err(QueryError::execute)?,
339        )?;
340
341        // Phase 2: decode external cursor token and validate it against plan surface.
342        let cursor_bytes = decode_optional_cursor_token(cursor_token)
343            .map_err(QueryError::from_cursor_plan_error)?;
344        let cursor = plan
345            .prepare_cursor(cursor_bytes.as_deref())
346            .map_err(QueryError::from_executor_plan_error)?;
347
348        // Phase 3: execute one traced page and encode outbound continuation token.
349        let (page, trace) = self
350            .with_metrics(|| {
351                self.load_executor::<E>()
352                    .execute_paged_with_cursor_traced(plan, cursor)
353            })
354            .map_err(QueryError::execute)?;
355        let next_cursor = page
356            .next_cursor
357            .map(|token| {
358                let Some(token) = token.as_scalar() else {
359                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
360                };
361
362                token.encode().map_err(|err| {
363                    QueryError::serialize_internal(format!(
364                        "failed to serialize continuation cursor: {err}"
365                    ))
366                })
367            })
368            .transpose()?;
369
370        Ok(PagedLoadExecutionWithTrace::new(
371            page.items,
372            next_cursor,
373            trace,
374        ))
375    }
376
377    /// Execute one grouped query page with optional grouped continuation cursor.
378    ///
379    /// This is the explicit grouped execution boundary; scalar load APIs reject
380    /// grouped plans to preserve scalar response contracts.
381    pub fn execute_grouped<E>(
382        &self,
383        query: &Query<E>,
384        cursor_token: Option<&str>,
385    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
386    where
387        E: PersistedRow<Canister = C> + EntityValue,
388    {
389        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
390        let next_cursor = page
391            .next_cursor
392            .map(|token| {
393                let Some(token) = token.as_grouped() else {
394                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
395                };
396
397                token.encode().map_err(|err| {
398                    QueryError::serialize_internal(format!(
399                        "failed to serialize grouped continuation cursor: {err}"
400                    ))
401                })
402            })
403            .transpose()?;
404
405        Ok(PagedGroupedExecutionWithTrace::new(
406            page.rows,
407            next_cursor,
408            trace,
409        ))
410    }
411
412    /// Execute one grouped query page and return grouped rows plus an already-encoded text cursor.
413    #[doc(hidden)]
414    pub fn execute_grouped_text_cursor<E>(
415        &self,
416        query: &Query<E>,
417        cursor_token: Option<&str>,
418    ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
419    where
420        E: PersistedRow<Canister = C> + EntityValue,
421    {
422        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
423        let next_cursor = page
424            .next_cursor
425            .map(Self::encode_grouped_page_cursor_hex)
426            .transpose()?;
427
428        Ok((page.rows, next_cursor, trace))
429    }
430}
431
432impl<C: CanisterKind> DbSession<C> {
433    // Execute the canonical grouped query core and return the raw grouped page
434    // plus optional execution trace before outward cursor formatting.
435    fn execute_grouped_page_with_trace<E>(
436        &self,
437        query: &Query<E>,
438        cursor_token: Option<&str>,
439    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
440    where
441        E: PersistedRow<Canister = C> + EntityValue,
442    {
443        // Phase 1: build/validate executable plan and require grouped shape.
444        let plan = self
445            .compile_query_with_visible_indexes(query)?
446            .into_executable();
447        Self::ensure_grouped_execution_strategy(
448            plan.execution_strategy().map_err(QueryError::execute)?,
449        )?;
450
451        // Phase 2: decode external grouped cursor token and validate against plan.
452        let cursor = decode_optional_grouped_cursor_token(cursor_token)
453            .map_err(QueryError::from_cursor_plan_error)?;
454        let cursor = plan
455            .prepare_grouped_cursor_token(cursor)
456            .map_err(QueryError::from_executor_plan_error)?;
457
458        // Phase 3: execute one grouped page while preserving the structural
459        // grouped cursor payload for whichever outward cursor format the caller needs.
460        self.with_metrics(|| {
461            self.load_executor::<E>()
462                .execute_grouped_paged_with_cursor_traced(plan, cursor)
463        })
464        .map_err(QueryError::execute)
465    }
466
467    // Encode one grouped page cursor directly to lowercase hex without
468    // round-tripping through a temporary raw cursor byte vector.
469    fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
470        let token: &GroupedContinuationToken = page_cursor
471            .as_grouped()
472            .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
473
474        token.encode_hex().map_err(|err| {
475            QueryError::serialize_internal(format!(
476                "failed to serialize grouped continuation cursor: {err}"
477            ))
478        })
479    }
480}