Skip to main content

icydb_core/db/session/
query.rs

1use crate::{
2    db::{
3        DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4        PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5        access::AccessStrategy,
6        executor::{BytesByProjectionMode, ExecutablePlan, ExecutionStrategy, LoadExecutor},
7        query::{
8            builder::aggregate::AggregateExpr,
9            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
10            plan::QueryMode,
11        },
12        session::{decode_optional_cursor_bytes, map_executor_plan_error},
13    },
14    error::InternalError,
15    traits::{CanisterKind, EntityKind, EntityValue},
16    value::Value,
17};
18
19impl<C: CanisterKind> DbSession<C> {
20    /// Execute one scalar load/delete query and return materialized response rows.
21    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
22    where
23        E: PersistedRow<Canister = C> + EntityValue,
24    {
25        // Phase 1: compile typed intent into one executable plan contract.
26        let mode = query.mode();
27        let plan = query.plan()?.into_executable();
28
29        // Phase 2: delegate execution to the dynamic entry shim.
30        self.execute_query_dyn(mode, plan)
31    }
32
33    /// Execute one scalar query from one pre-built executable contract.
34    ///
35    /// This shim is the Slice A dynamic entry boundary. It keeps the legacy
36    /// typed `execute_query(...)` surface stable while introducing one central
37    /// execution path that can later bind descriptor-driven executor forms.
38    pub(in crate::db) fn execute_query_dyn<E>(
39        &self,
40        mode: QueryMode,
41        plan: ExecutablePlan<E>,
42    ) -> Result<EntityResponse<E>, QueryError>
43    where
44        E: PersistedRow<Canister = C> + EntityValue,
45    {
46        let result = match mode {
47            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
48            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
49        };
50
51        result.map_err(QueryError::execute)
52    }
53
54    // Shared load-query terminal wrapper: build plan, run under metrics, map
55    // execution errors into query-facing errors.
56    pub(in crate::db) fn execute_load_query_with<E, T>(
57        &self,
58        query: &Query<E>,
59        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
60    ) -> Result<T, QueryError>
61    where
62        E: PersistedRow<Canister = C> + EntityValue,
63    {
64        let plan = query.plan()?.into_executable();
65
66        self.with_metrics(|| op(self.load_executor::<E>(), plan))
67            .map_err(QueryError::execute)
68    }
69
70    /// Build one trace payload for a query without executing it.
71    ///
72    /// This lightweight surface is intended for developer diagnostics:
73    /// plan hash, access strategy summary, and planner/executor route shape.
74    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
75    where
76        E: EntityKind<Canister = C>,
77    {
78        let compiled = query.plan()?;
79        let explain = compiled.explain();
80        let plan_hash = compiled.plan_hash_hex();
81
82        let executable = compiled.into_executable();
83        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
84        let execution_strategy = match query.mode() {
85            QueryMode::Load(_) => Some(trace_execution_strategy(
86                executable
87                    .execution_strategy()
88                    .map_err(QueryError::execute)?,
89            )),
90            QueryMode::Delete(_) => None,
91        };
92
93        Ok(QueryTracePlan::new(
94            plan_hash,
95            access_strategy,
96            execution_strategy,
97            explain,
98        ))
99    }
100
101    /// Build one aggregate-terminal explain payload without executing the query.
102    pub(crate) fn explain_load_query_terminal_with<E>(
103        query: &Query<E>,
104        aggregate: AggregateExpr,
105    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
106    where
107        E: EntityKind<Canister = C> + EntityValue,
108    {
109        // Phase 1: build one compiled query once and project logical explain output.
110        let compiled = query.plan()?;
111        let query_explain = compiled.explain();
112        let terminal = aggregate.kind();
113
114        // Phase 2: derive the executor route label for this aggregate terminal.
115        let executable = compiled.into_executable();
116        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
117
118        Ok(ExplainAggregateTerminalPlan::new(
119            query_explain,
120            terminal,
121            execution,
122        ))
123    }
124
125    /// Build one bytes-by terminal execution descriptor without executing the query.
126    pub(crate) fn explain_load_query_bytes_by_with<E>(
127        query: &Query<E>,
128        target_field: &str,
129    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
130    where
131        E: EntityKind<Canister = C> + EntityValue,
132    {
133        let executable = query.plan()?.into_executable();
134        let mut descriptor = executable
135            .explain_load_execution_node_descriptor()
136            .map_err(QueryError::execute)?;
137        let projection_mode = executable.bytes_by_projection_mode(target_field);
138        let projection_mode_label =
139            ExecutablePlan::<E>::bytes_by_projection_mode_label(projection_mode);
140
141        descriptor
142            .node_properties
143            .insert("terminal".to_string(), Value::from("bytes_by"));
144        descriptor.node_properties.insert(
145            "terminal_field".to_string(),
146            Value::from(target_field.to_string()),
147        );
148        descriptor.node_properties.insert(
149            "terminal_projection_mode".to_string(),
150            Value::from(projection_mode_label),
151        );
152        descriptor.node_properties.insert(
153            "terminal_index_only".to_string(),
154            Value::from(matches!(
155                projection_mode,
156                BytesByProjectionMode::CoveringIndex | BytesByProjectionMode::CoveringConstant
157            )),
158        );
159
160        Ok(descriptor)
161    }
162
163    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
164    pub(crate) fn execute_load_query_paged_with_trace<E>(
165        &self,
166        query: &Query<E>,
167        cursor_token: Option<&str>,
168    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
169    where
170        E: PersistedRow<Canister = C> + EntityValue,
171    {
172        // Phase 1: build/validate executable plan and reject grouped plans.
173        let plan = query.plan()?.into_executable();
174        match plan.execution_strategy().map_err(QueryError::execute)? {
175            ExecutionStrategy::PrimaryKey => {
176                return Err(QueryError::execute(
177                    crate::db::error::query_executor_invariant(
178                        "cursor pagination requires explicit or grouped ordering",
179                    ),
180                ));
181            }
182            ExecutionStrategy::Ordered => {}
183            ExecutionStrategy::Grouped => {
184                return Err(QueryError::execute(
185                    crate::db::error::query_executor_invariant(
186                        "grouped plans require execute_grouped(...)",
187                    ),
188                ));
189            }
190        }
191
192        // Phase 2: decode external cursor token and validate it against plan surface.
193        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
194        let cursor = plan
195            .prepare_cursor(cursor_bytes.as_deref())
196            .map_err(map_executor_plan_error)?;
197
198        // Phase 3: execute one traced page and encode outbound continuation token.
199        let (page, trace) = self
200            .with_metrics(|| {
201                self.load_executor::<E>()
202                    .execute_paged_with_cursor_traced(plan, cursor)
203            })
204            .map_err(QueryError::execute)?;
205        let next_cursor = page
206            .next_cursor
207            .map(|token| {
208                let Some(token) = token.as_scalar() else {
209                    return Err(QueryError::execute(
210                        crate::db::error::query_executor_invariant(
211                            "scalar load pagination emitted grouped continuation token",
212                        ),
213                    ));
214                };
215
216                token.encode().map_err(|err| {
217                    QueryError::execute(InternalError::serialize_internal(format!(
218                        "failed to serialize continuation cursor: {err}"
219                    )))
220                })
221            })
222            .transpose()?;
223
224        Ok(PagedLoadExecutionWithTrace::new(
225            page.items,
226            next_cursor,
227            trace,
228        ))
229    }
230
231    /// Execute one grouped query page with optional grouped continuation cursor.
232    ///
233    /// This is the explicit grouped execution boundary; scalar load APIs reject
234    /// grouped plans to preserve scalar response contracts.
235    pub fn execute_grouped<E>(
236        &self,
237        query: &Query<E>,
238        cursor_token: Option<&str>,
239    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
240    where
241        E: PersistedRow<Canister = C> + EntityValue,
242    {
243        // Phase 1: build/validate executable plan and require grouped shape.
244        let plan = query.plan()?.into_executable();
245        if !matches!(
246            plan.execution_strategy().map_err(QueryError::execute)?,
247            ExecutionStrategy::Grouped
248        ) {
249            return Err(QueryError::execute(
250                crate::db::error::query_executor_invariant(
251                    "execute_grouped requires grouped logical plans",
252                ),
253            ));
254        }
255
256        // Phase 2: decode external grouped cursor token and validate against plan.
257        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
258        let cursor = plan
259            .prepare_grouped_cursor(cursor_bytes.as_deref())
260            .map_err(map_executor_plan_error)?;
261
262        // Phase 3: execute grouped page and encode outbound grouped continuation token.
263        let (page, trace) = self
264            .with_metrics(|| {
265                self.load_executor::<E>()
266                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
267            })
268            .map_err(QueryError::execute)?;
269        let next_cursor = page
270            .next_cursor
271            .map(|token| {
272                let Some(token) = token.as_grouped() else {
273                    return Err(QueryError::execute(
274                        crate::db::error::query_executor_invariant(
275                            "grouped pagination emitted scalar continuation token",
276                        ),
277                    ));
278                };
279
280                token.encode().map_err(|err| {
281                    QueryError::execute(InternalError::serialize_internal(format!(
282                        "failed to serialize grouped continuation cursor: {err}"
283                    )))
284                })
285            })
286            .transpose()?;
287
288        Ok(PagedGroupedExecutionWithTrace::new(
289            page.rows,
290            next_cursor,
291            trace,
292        ))
293    }
294}
295
296const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
297    match strategy {
298        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
299        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
300        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
301    }
302}