Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
9        PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
10        access::AccessStrategy,
11        cursor::CursorPlanError,
12        executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
13        query::plan::QueryMode,
14        session::decode_optional_cursor_bytes,
15    },
16    error::InternalError,
17    traits::{CanisterKind, EntityKind, EntityValue},
18};
19
20impl<C: CanisterKind> DbSession<C> {
21    // Validate that one execution strategy is admissible for scalar paged load
22    // execution and fail closed on grouped/primary-key-only routes.
23    fn ensure_scalar_paged_execution_strategy(
24        strategy: ExecutionStrategy,
25    ) -> Result<(), QueryError> {
26        match strategy {
27            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
28                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
29            )),
30            ExecutionStrategy::Ordered => Ok(()),
31            ExecutionStrategy::Grouped => Err(QueryError::invariant(
32                "grouped plans require execute_grouped(...)",
33            )),
34        }
35    }
36
37    // Validate that one execution strategy is admissible for the grouped
38    // execution surface.
39    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
40        match strategy {
41            ExecutionStrategy::Grouped => Ok(()),
42            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
43                QueryError::invariant("execute_grouped requires grouped logical plans"),
44            ),
45        }
46    }
47
48    /// Execute one scalar load/delete query and return materialized response rows.
49    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
50    where
51        E: PersistedRow<Canister = C> + EntityValue,
52    {
53        // Phase 1: compile typed intent into one executable plan contract.
54        let mode = query.mode();
55        let plan = query.plan()?.into_executable();
56
57        // Phase 2: delegate execution to the shared compiled-plan entry path.
58        self.execute_query_dyn(mode, plan)
59    }
60
61    /// Execute one typed delete query and return only the affected-row count.
62    #[doc(hidden)]
63    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
64    where
65        E: PersistedRow<Canister = C> + EntityValue,
66    {
67        // Phase 1: fail closed if the caller routes a non-delete query here.
68        if !query.mode().is_delete() {
69            return Err(QueryError::unsupported_query(
70                "delete count execution requires delete query mode",
71            ));
72        }
73
74        // Phase 2: compile typed delete intent into one executable plan contract.
75        let plan = query.plan()?.into_executable();
76
77        // Phase 3: execute the shared delete core while skipping response-row materialization.
78        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
79            .map_err(QueryError::execute)
80    }
81
82    /// Execute one scalar query from one pre-built executable contract.
83    ///
84    /// This is the shared compiled-plan entry boundary used by the typed
85    /// `execute_query(...)` surface and adjacent query execution facades.
86    pub(in crate::db) fn execute_query_dyn<E>(
87        &self,
88        mode: QueryMode,
89        plan: ExecutablePlan<E>,
90    ) -> Result<EntityResponse<E>, QueryError>
91    where
92        E: PersistedRow<Canister = C> + EntityValue,
93    {
94        let result = match mode {
95            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
96            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
97        };
98
99        result.map_err(QueryError::execute)
100    }
101
102    // Shared load-query terminal wrapper: build plan, run under metrics, map
103    // execution errors into query-facing errors.
104    pub(in crate::db) fn execute_load_query_with<E, T>(
105        &self,
106        query: &Query<E>,
107        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
108    ) -> Result<T, QueryError>
109    where
110        E: PersistedRow<Canister = C> + EntityValue,
111    {
112        let plan = query.plan()?.into_executable();
113
114        self.with_metrics(|| op(self.load_executor::<E>(), plan))
115            .map_err(QueryError::execute)
116    }
117
118    /// Build one trace payload for a query without executing it.
119    ///
120    /// This lightweight surface is intended for developer diagnostics:
121    /// plan hash, access strategy summary, and planner/executor route shape.
122    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
123    where
124        E: EntityKind<Canister = C>,
125    {
126        let compiled = query.plan()?;
127        let explain = compiled.explain();
128        let plan_hash = compiled.plan_hash_hex();
129
130        let executable = compiled.into_executable();
131        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
132        let execution_strategy = match query.mode() {
133            QueryMode::Load(_) => Some(trace_execution_strategy(
134                executable
135                    .execution_strategy()
136                    .map_err(QueryError::execute)?,
137            )),
138            QueryMode::Delete(_) => None,
139        };
140
141        Ok(QueryTracePlan::new(
142            plan_hash,
143            access_strategy,
144            execution_strategy,
145            explain,
146        ))
147    }
148
149    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
150    pub(crate) fn execute_load_query_paged_with_trace<E>(
151        &self,
152        query: &Query<E>,
153        cursor_token: Option<&str>,
154    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
155    where
156        E: PersistedRow<Canister = C> + EntityValue,
157    {
158        // Phase 1: build/validate executable plan and reject grouped plans.
159        let plan = query.plan()?.into_executable();
160        Self::ensure_scalar_paged_execution_strategy(
161            plan.execution_strategy().map_err(QueryError::execute)?,
162        )?;
163
164        // Phase 2: decode external cursor token and validate it against plan surface.
165        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
166        let cursor = plan
167            .prepare_cursor(cursor_bytes.as_deref())
168            .map_err(QueryError::from_executor_plan_error)?;
169
170        // Phase 3: execute one traced page and encode outbound continuation token.
171        let (page, trace) = self
172            .with_metrics(|| {
173                self.load_executor::<E>()
174                    .execute_paged_with_cursor_traced(plan, cursor)
175            })
176            .map_err(QueryError::execute)?;
177        let next_cursor = page
178            .next_cursor
179            .map(|token| {
180                let Some(token) = token.as_scalar() else {
181                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
182                };
183
184                token.encode().map_err(|err| {
185                    QueryError::serialize_internal(format!(
186                        "failed to serialize continuation cursor: {err}"
187                    ))
188                })
189            })
190            .transpose()?;
191
192        Ok(PagedLoadExecutionWithTrace::new(
193            page.items,
194            next_cursor,
195            trace,
196        ))
197    }
198
199    /// Execute one grouped query page with optional grouped continuation cursor.
200    ///
201    /// This is the explicit grouped execution boundary; scalar load APIs reject
202    /// grouped plans to preserve scalar response contracts.
203    pub fn execute_grouped<E>(
204        &self,
205        query: &Query<E>,
206        cursor_token: Option<&str>,
207    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
208    where
209        E: PersistedRow<Canister = C> + EntityValue,
210    {
211        // Phase 1: build/validate executable plan and require grouped shape.
212        let plan = query.plan()?.into_executable();
213        Self::ensure_grouped_execution_strategy(
214            plan.execution_strategy().map_err(QueryError::execute)?,
215        )?;
216
217        // Phase 2: decode external grouped cursor token and validate against plan.
218        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
219        let cursor = plan
220            .prepare_grouped_cursor(cursor_bytes.as_deref())
221            .map_err(QueryError::from_executor_plan_error)?;
222
223        // Phase 3: execute grouped page and encode outbound grouped continuation token.
224        let (page, trace) = self
225            .with_metrics(|| {
226                self.load_executor::<E>()
227                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
228            })
229            .map_err(QueryError::execute)?;
230        let next_cursor = page
231            .next_cursor
232            .map(|token| {
233                let Some(token) = token.as_grouped() else {
234                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
235                };
236
237                token.encode().map_err(|err| {
238                    QueryError::serialize_internal(format!(
239                        "failed to serialize grouped continuation cursor: {err}"
240                    ))
241                })
242            })
243            .transpose()?;
244
245        Ok(PagedGroupedExecutionWithTrace::new(
246            page.rows,
247            next_cursor,
248            trace,
249        ))
250    }
251}
252
253const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
254    match strategy {
255        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
256        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
257        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
258    }
259}