Skip to main content

icydb_core/db/session/
query.rs

1use crate::{
2    db::{
3        DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4        Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5        access::AccessStrategy,
6        executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
7        query::{
8            builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
9            plan::QueryMode,
10        },
11        session::{decode_optional_cursor_bytes, map_executor_plan_error},
12    },
13    error::InternalError,
14    traits::{CanisterKind, EntityKind, EntityValue},
15};
16
17impl<C: CanisterKind> DbSession<C> {
18    /// Execute one scalar load/delete query and return materialized response rows.
19    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
20    where
21        E: EntityKind<Canister = C> + EntityValue,
22    {
23        let plan = query.plan()?.into_executable();
24
25        let result = match query.mode() {
26            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
27            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
28        };
29
30        result.map_err(QueryError::execute)
31    }
32
33    // Shared load-query terminal wrapper: build plan, run under metrics, map
34    // execution errors into query-facing errors.
35    pub(in crate::db) fn execute_load_query_with<E, T>(
36        &self,
37        query: &Query<E>,
38        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
39    ) -> Result<T, QueryError>
40    where
41        E: EntityKind<Canister = C> + EntityValue,
42    {
43        let plan = query.plan()?.into_executable();
44
45        self.with_metrics(|| op(self.load_executor::<E>(), plan))
46            .map_err(QueryError::execute)
47    }
48
49    /// Build one trace payload for a query without executing it.
50    ///
51    /// This lightweight surface is intended for developer diagnostics:
52    /// plan hash, access strategy summary, and planner/executor route shape.
53    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
54    where
55        E: EntityKind<Canister = C>,
56    {
57        let compiled = query.plan()?;
58        let explain = compiled.explain();
59        let plan_hash = compiled.plan_hash_hex();
60
61        let executable = compiled.into_executable();
62        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
63        let execution_strategy = match query.mode() {
64            QueryMode::Load(_) => Some(trace_execution_strategy(
65                executable
66                    .execution_strategy()
67                    .map_err(QueryError::execute)?,
68            )),
69            QueryMode::Delete(_) => None,
70        };
71
72        Ok(QueryTracePlan::new(
73            plan_hash,
74            access_strategy,
75            execution_strategy,
76            explain,
77        ))
78    }
79
80    /// Build one aggregate-terminal explain payload without executing the query.
81    pub(crate) fn explain_load_query_terminal_with<E>(
82        query: &Query<E>,
83        aggregate: AggregateExpr,
84    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
85    where
86        E: EntityKind<Canister = C> + EntityValue,
87    {
88        // Phase 1: build one compiled query once and project logical explain output.
89        let compiled = query.plan()?;
90        let query_explain = compiled.explain();
91        let terminal = aggregate.kind();
92
93        // Phase 2: derive the executor route label for this aggregate terminal.
94        let executable = compiled.into_executable();
95        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
96
97        Ok(ExplainAggregateTerminalPlan::new(
98            query_explain,
99            terminal,
100            execution,
101        ))
102    }
103
104    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
105    pub(crate) fn execute_load_query_paged_with_trace<E>(
106        &self,
107        query: &Query<E>,
108        cursor_token: Option<&str>,
109    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
110    where
111        E: EntityKind<Canister = C> + EntityValue,
112    {
113        // Phase 1: build/validate executable plan and reject grouped plans.
114        let plan = query.plan()?.into_executable();
115        match plan.execution_strategy().map_err(QueryError::execute)? {
116            ExecutionStrategy::PrimaryKey => {
117                return Err(QueryError::execute(
118                    crate::db::error::query_executor_invariant(
119                        "cursor pagination requires explicit or grouped ordering",
120                    ),
121                ));
122            }
123            ExecutionStrategy::Ordered => {}
124            ExecutionStrategy::Grouped => {
125                return Err(QueryError::execute(
126                    crate::db::error::query_executor_invariant(
127                        "grouped plans require execute_grouped(...)",
128                    ),
129                ));
130            }
131        }
132
133        // Phase 2: decode external cursor token and validate it against plan surface.
134        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
135        let cursor = plan
136            .prepare_cursor(cursor_bytes.as_deref())
137            .map_err(map_executor_plan_error)?;
138
139        // Phase 3: execute one traced page and encode outbound continuation token.
140        let (page, trace) = self
141            .with_metrics(|| {
142                self.load_executor::<E>()
143                    .execute_paged_with_cursor_traced(plan, cursor)
144            })
145            .map_err(QueryError::execute)?;
146        let next_cursor = page
147            .next_cursor
148            .map(|token| {
149                let Some(token) = token.as_scalar() else {
150                    return Err(QueryError::execute(
151                        crate::db::error::query_executor_invariant(
152                            "scalar load pagination emitted grouped continuation token",
153                        ),
154                    ));
155                };
156
157                token.encode().map_err(|err| {
158                    QueryError::execute(InternalError::serialize_internal(format!(
159                        "failed to serialize continuation cursor: {err}"
160                    )))
161                })
162            })
163            .transpose()?;
164
165        Ok(PagedLoadExecutionWithTrace::new(
166            page.items,
167            next_cursor,
168            trace,
169        ))
170    }
171
172    /// Execute one grouped query page with optional grouped continuation cursor.
173    ///
174    /// This is the explicit grouped execution boundary; scalar load APIs reject
175    /// grouped plans to preserve scalar response contracts.
176    pub fn execute_grouped<E>(
177        &self,
178        query: &Query<E>,
179        cursor_token: Option<&str>,
180    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
181    where
182        E: EntityKind<Canister = C> + EntityValue,
183    {
184        // Phase 1: build/validate executable plan and require grouped shape.
185        let plan = query.plan()?.into_executable();
186        if !matches!(
187            plan.execution_strategy().map_err(QueryError::execute)?,
188            ExecutionStrategy::Grouped
189        ) {
190            return Err(QueryError::execute(
191                crate::db::error::query_executor_invariant(
192                    "execute_grouped requires grouped logical plans",
193                ),
194            ));
195        }
196
197        // Phase 2: decode external grouped cursor token and validate against plan.
198        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
199        let cursor = plan
200            .prepare_grouped_cursor(cursor_bytes.as_deref())
201            .map_err(map_executor_plan_error)?;
202
203        // Phase 3: execute grouped page and encode outbound grouped continuation token.
204        let (page, trace) = self
205            .with_metrics(|| {
206                self.load_executor::<E>()
207                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
208            })
209            .map_err(QueryError::execute)?;
210        let next_cursor = page
211            .next_cursor
212            .map(|token| {
213                let Some(token) = token.as_grouped() else {
214                    return Err(QueryError::execute(
215                        crate::db::error::query_executor_invariant(
216                            "grouped pagination emitted scalar continuation token",
217                        ),
218                    ));
219                };
220
221                token.encode().map_err(|err| {
222                    QueryError::execute(InternalError::serialize_internal(format!(
223                        "failed to serialize grouped continuation cursor: {err}"
224                    )))
225                })
226            })
227            .transpose()?;
228
229        Ok(PagedGroupedExecutionWithTrace::new(
230            page.rows,
231            next_cursor,
232            trace,
233        ))
234    }
235}
236
237const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
238    match strategy {
239        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
240        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
241        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
242    }
243}