Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: module-local ownership and contracts for db::session::query.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10        TraceExecutionStrategy,
11        access::AccessStrategy,
12        cursor::{CursorPlanError, GroupedContinuationToken},
13        diagnostics::ExecutionTrace,
14        executor::{
15            ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16        },
17        query::plan::QueryMode,
18        session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
19    },
20    error::InternalError,
21    traits::{CanisterKind, EntityKind, EntityValue},
22};
23
24impl<C: CanisterKind> DbSession<C> {
25    // Validate that one execution strategy is admissible for scalar paged load
26    // execution and fail closed on grouped/primary-key-only routes.
27    fn ensure_scalar_paged_execution_strategy(
28        strategy: ExecutionStrategy,
29    ) -> Result<(), QueryError> {
30        match strategy {
31            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
32                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
33            )),
34            ExecutionStrategy::Ordered => Ok(()),
35            ExecutionStrategy::Grouped => Err(QueryError::invariant(
36                "grouped plans require execute_grouped(...)",
37            )),
38        }
39    }
40
41    // Validate that one execution strategy is admissible for the grouped
42    // execution surface.
43    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
44        match strategy {
45            ExecutionStrategy::Grouped => Ok(()),
46            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
47                QueryError::invariant("execute_grouped requires grouped logical plans"),
48            ),
49        }
50    }
51
52    /// Execute one scalar load/delete query and return materialized response rows.
53    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
54    where
55        E: PersistedRow<Canister = C> + EntityValue,
56    {
57        // Phase 1: compile typed intent into one executable plan contract.
58        let mode = query.mode();
59        let plan = query.plan()?.into_executable();
60
61        // Phase 2: delegate execution to the shared compiled-plan entry path.
62        self.execute_query_dyn(mode, plan)
63    }
64
65    /// Execute one typed delete query and return only the affected-row count.
66    #[doc(hidden)]
67    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
68    where
69        E: PersistedRow<Canister = C> + EntityValue,
70    {
71        // Phase 1: fail closed if the caller routes a non-delete query here.
72        if !query.mode().is_delete() {
73            return Err(QueryError::unsupported_query(
74                "delete count execution requires delete query mode",
75            ));
76        }
77
78        // Phase 2: compile typed delete intent into one executable plan contract.
79        let plan = query.plan()?.into_executable();
80
81        // Phase 3: execute the shared delete core while skipping response-row materialization.
82        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
83            .map_err(QueryError::execute)
84    }
85
86    /// Execute one scalar query from one pre-built executable contract.
87    ///
88    /// This is the shared compiled-plan entry boundary used by the typed
89    /// `execute_query(...)` surface and adjacent query execution facades.
90    pub(in crate::db) fn execute_query_dyn<E>(
91        &self,
92        mode: QueryMode,
93        plan: ExecutablePlan<E>,
94    ) -> Result<EntityResponse<E>, QueryError>
95    where
96        E: PersistedRow<Canister = C> + EntityValue,
97    {
98        let result = match mode {
99            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
100            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
101        };
102
103        result.map_err(QueryError::execute)
104    }
105
106    // Shared load-query terminal wrapper: build plan, run under metrics, map
107    // execution errors into query-facing errors.
108    pub(in crate::db) fn execute_load_query_with<E, T>(
109        &self,
110        query: &Query<E>,
111        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
112    ) -> Result<T, QueryError>
113    where
114        E: PersistedRow<Canister = C> + EntityValue,
115    {
116        let plan = query.plan()?.into_executable();
117
118        self.with_metrics(|| op(self.load_executor::<E>(), plan))
119            .map_err(QueryError::execute)
120    }
121
122    /// Build one trace payload for a query without executing it.
123    ///
124    /// This lightweight surface is intended for developer diagnostics:
125    /// plan hash, access strategy summary, and planner/executor route shape.
126    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
127    where
128        E: EntityKind<Canister = C>,
129    {
130        let compiled = query.plan()?;
131        let explain = compiled.explain();
132        let plan_hash = compiled.plan_hash_hex();
133
134        let executable = compiled.into_executable();
135        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
136        let execution_strategy = match query.mode() {
137            QueryMode::Load(_) => Some(trace_execution_strategy(
138                executable
139                    .execution_strategy()
140                    .map_err(QueryError::execute)?,
141            )),
142            QueryMode::Delete(_) => None,
143        };
144
145        Ok(QueryTracePlan::new(
146            plan_hash,
147            access_strategy,
148            execution_strategy,
149            explain,
150        ))
151    }
152
153    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
154    pub(crate) fn execute_load_query_paged_with_trace<E>(
155        &self,
156        query: &Query<E>,
157        cursor_token: Option<&str>,
158    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
159    where
160        E: PersistedRow<Canister = C> + EntityValue,
161    {
162        // Phase 1: build/validate executable plan and reject grouped plans.
163        let plan = query.plan()?.into_executable();
164        Self::ensure_scalar_paged_execution_strategy(
165            plan.execution_strategy().map_err(QueryError::execute)?,
166        )?;
167
168        // Phase 2: decode external cursor token and validate it against plan surface.
169        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
170        let cursor = plan
171            .prepare_cursor(cursor_bytes.as_deref())
172            .map_err(QueryError::from_executor_plan_error)?;
173
174        // Phase 3: execute one traced page and encode outbound continuation token.
175        let (page, trace) = self
176            .with_metrics(|| {
177                self.load_executor::<E>()
178                    .execute_paged_with_cursor_traced(plan, cursor)
179            })
180            .map_err(QueryError::execute)?;
181        let next_cursor = page
182            .next_cursor
183            .map(|token| {
184                let Some(token) = token.as_scalar() else {
185                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
186                };
187
188                token.encode().map_err(|err| {
189                    QueryError::serialize_internal(format!(
190                        "failed to serialize continuation cursor: {err}"
191                    ))
192                })
193            })
194            .transpose()?;
195
196        Ok(PagedLoadExecutionWithTrace::new(
197            page.items,
198            next_cursor,
199            trace,
200        ))
201    }
202
203    /// Execute one grouped query page with optional grouped continuation cursor.
204    ///
205    /// This is the explicit grouped execution boundary; scalar load APIs reject
206    /// grouped plans to preserve scalar response contracts.
207    pub fn execute_grouped<E>(
208        &self,
209        query: &Query<E>,
210        cursor_token: Option<&str>,
211    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
212    where
213        E: PersistedRow<Canister = C> + EntityValue,
214    {
215        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
216        let next_cursor = page
217            .next_cursor
218            .map(|token| {
219                let Some(token) = token.as_grouped() else {
220                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
221                };
222
223                token.encode().map_err(|err| {
224                    QueryError::serialize_internal(format!(
225                        "failed to serialize grouped continuation cursor: {err}"
226                    ))
227                })
228            })
229            .transpose()?;
230
231        Ok(PagedGroupedExecutionWithTrace::new(
232            page.rows,
233            next_cursor,
234            trace,
235        ))
236    }
237
238    /// Execute one grouped query page and return grouped rows plus an already-encoded text cursor.
239    #[doc(hidden)]
240    pub fn execute_grouped_text_cursor<E>(
241        &self,
242        query: &Query<E>,
243        cursor_token: Option<&str>,
244    ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
245    where
246        E: PersistedRow<Canister = C> + EntityValue,
247    {
248        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
249        let next_cursor = page
250            .next_cursor
251            .map(Self::encode_grouped_page_cursor_hex)
252            .transpose()?;
253
254        Ok((page.rows, next_cursor, trace))
255    }
256}
257
258impl<C: CanisterKind> DbSession<C> {
259    // Execute the canonical grouped query core and return the raw grouped page
260    // plus optional execution trace before outward cursor formatting.
261    fn execute_grouped_page_with_trace<E>(
262        &self,
263        query: &Query<E>,
264        cursor_token: Option<&str>,
265    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
266    where
267        E: PersistedRow<Canister = C> + EntityValue,
268    {
269        // Phase 1: build/validate executable plan and require grouped shape.
270        let plan = query.plan()?.into_executable();
271        Self::ensure_grouped_execution_strategy(
272            plan.execution_strategy().map_err(QueryError::execute)?,
273        )?;
274
275        // Phase 2: decode external grouped cursor token and validate against plan.
276        let cursor = decode_optional_grouped_cursor(cursor_token)?;
277        let cursor = plan
278            .prepare_grouped_cursor_token(cursor)
279            .map_err(QueryError::from_executor_plan_error)?;
280
281        // Phase 3: execute one grouped page while preserving the structural
282        // grouped cursor payload for whichever outward cursor format the caller needs.
283        self.with_metrics(|| {
284            self.load_executor::<E>()
285                .execute_grouped_paged_with_cursor_traced(plan, cursor)
286        })
287        .map_err(QueryError::execute)
288    }
289
290    // Encode one grouped page cursor directly to lowercase hex without
291    // round-tripping through a temporary raw cursor byte vector.
292    fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
293        let token: &GroupedContinuationToken = page_cursor
294            .as_grouped()
295            .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
296
297        token.encode_hex().map_err(|err| {
298            QueryError::serialize_internal(format!(
299                "failed to serialize grouped continuation cursor: {err}"
300            ))
301        })
302    }
303}
304
305const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
306    match strategy {
307        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
308        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
309        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
310    }
311}