Skip to main content

icydb_core/db/session/
query.rs

1use crate::{
2    db::{
3        DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4        PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5        access::AccessStrategy,
6        cursor::CursorPlanError,
7        executor::{BytesByProjectionMode, ExecutablePlan, ExecutionStrategy, LoadExecutor},
8        query::{
9            builder::aggregate::AggregateExpr,
10            explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
11            plan::QueryMode,
12        },
13        session::decode_optional_cursor_bytes,
14    },
15    error::InternalError,
16    traits::{CanisterKind, EntityKind, EntityValue},
17    value::Value,
18};
19
20impl<C: CanisterKind> DbSession<C> {
21    // Validate that one execution strategy is admissible for scalar paged load
22    // execution and fail closed on grouped/primary-key-only routes.
23    fn ensure_scalar_paged_execution_strategy(
24        strategy: ExecutionStrategy,
25    ) -> Result<(), QueryError> {
26        match strategy {
27            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
28                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
29            )),
30            ExecutionStrategy::Ordered => Ok(()),
31            ExecutionStrategy::Grouped => Err(QueryError::invariant(
32                "grouped plans require execute_grouped(...)",
33            )),
34        }
35    }
36
37    // Validate that one execution strategy is admissible for the grouped
38    // execution surface.
39    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
40        match strategy {
41            ExecutionStrategy::Grouped => Ok(()),
42            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
43                QueryError::invariant("execute_grouped requires grouped logical plans"),
44            ),
45        }
46    }
47
48    /// Execute one scalar load/delete query and return materialized response rows.
49    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
50    where
51        E: PersistedRow<Canister = C> + EntityValue,
52    {
53        // Phase 1: compile typed intent into one executable plan contract.
54        let mode = query.mode();
55        let plan = query.plan()?.into_executable();
56
57        // Phase 2: delegate execution to the shared compiled-plan entry path.
58        self.execute_query_dyn(mode, plan)
59    }
60
61    /// Execute one scalar query from one pre-built executable contract.
62    ///
63    /// This is the shared compiled-plan entry boundary used by the typed
64    /// `execute_query(...)` surface and adjacent query execution facades.
65    pub(in crate::db) fn execute_query_dyn<E>(
66        &self,
67        mode: QueryMode,
68        plan: ExecutablePlan<E>,
69    ) -> Result<EntityResponse<E>, QueryError>
70    where
71        E: PersistedRow<Canister = C> + EntityValue,
72    {
73        let result = match mode {
74            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
75            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
76        };
77
78        result.map_err(QueryError::execute)
79    }
80
81    // Shared load-query terminal wrapper: build plan, run under metrics, map
82    // execution errors into query-facing errors.
83    pub(in crate::db) fn execute_load_query_with<E, T>(
84        &self,
85        query: &Query<E>,
86        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
87    ) -> Result<T, QueryError>
88    where
89        E: PersistedRow<Canister = C> + EntityValue,
90    {
91        let plan = query.plan()?.into_executable();
92
93        self.with_metrics(|| op(self.load_executor::<E>(), plan))
94            .map_err(QueryError::execute)
95    }
96
97    /// Build one trace payload for a query without executing it.
98    ///
99    /// This lightweight surface is intended for developer diagnostics:
100    /// plan hash, access strategy summary, and planner/executor route shape.
101    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
102    where
103        E: EntityKind<Canister = C>,
104    {
105        let compiled = query.plan()?;
106        let explain = compiled.explain();
107        let plan_hash = compiled.plan_hash_hex();
108
109        let executable = compiled.into_executable();
110        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
111        let execution_strategy = match query.mode() {
112            QueryMode::Load(_) => Some(trace_execution_strategy(
113                executable
114                    .execution_strategy()
115                    .map_err(QueryError::execute)?,
116            )),
117            QueryMode::Delete(_) => None,
118        };
119
120        Ok(QueryTracePlan::new(
121            plan_hash,
122            access_strategy,
123            execution_strategy,
124            explain,
125        ))
126    }
127
128    /// Build one aggregate-terminal explain payload without executing the query.
129    pub(crate) fn explain_load_query_terminal_with<E>(
130        query: &Query<E>,
131        aggregate: AggregateExpr,
132    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
133    where
134        E: EntityKind<Canister = C> + EntityValue,
135    {
136        // Phase 1: build one compiled query once and project logical explain output.
137        let compiled = query.plan()?;
138        let query_explain = compiled.explain();
139        let terminal = aggregate.kind();
140
141        // Phase 2: derive the executor route label for this aggregate terminal.
142        let executable = compiled.into_executable();
143        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
144
145        Ok(ExplainAggregateTerminalPlan::new(
146            query_explain,
147            terminal,
148            execution,
149        ))
150    }
151
152    /// Build one bytes-by terminal execution descriptor without executing the query.
153    pub(crate) fn explain_load_query_bytes_by_with<E>(
154        query: &Query<E>,
155        target_field: &str,
156    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
157    where
158        E: EntityKind<Canister = C> + EntityValue,
159    {
160        let executable = query.plan()?.into_executable();
161        let mut descriptor = executable
162            .explain_load_execution_node_descriptor()
163            .map_err(QueryError::execute)?;
164        let projection_mode = executable.bytes_by_projection_mode(target_field);
165        let projection_mode_label =
166            ExecutablePlan::<E>::bytes_by_projection_mode_label(projection_mode);
167
168        descriptor
169            .node_properties
170            .insert("terminal".to_string(), Value::from("bytes_by"));
171        descriptor.node_properties.insert(
172            "terminal_field".to_string(),
173            Value::from(target_field.to_string()),
174        );
175        descriptor.node_properties.insert(
176            "terminal_projection_mode".to_string(),
177            Value::from(projection_mode_label),
178        );
179        descriptor.node_properties.insert(
180            "terminal_index_only".to_string(),
181            Value::from(matches!(
182                projection_mode,
183                BytesByProjectionMode::CoveringIndex | BytesByProjectionMode::CoveringConstant
184            )),
185        );
186
187        Ok(descriptor)
188    }
189
190    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
191    pub(crate) fn execute_load_query_paged_with_trace<E>(
192        &self,
193        query: &Query<E>,
194        cursor_token: Option<&str>,
195    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
196    where
197        E: PersistedRow<Canister = C> + EntityValue,
198    {
199        // Phase 1: build/validate executable plan and reject grouped plans.
200        let plan = query.plan()?.into_executable();
201        Self::ensure_scalar_paged_execution_strategy(
202            plan.execution_strategy().map_err(QueryError::execute)?,
203        )?;
204
205        // Phase 2: decode external cursor token and validate it against plan surface.
206        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
207        let cursor = plan
208            .prepare_cursor(cursor_bytes.as_deref())
209            .map_err(QueryError::from_executor_plan_error)?;
210
211        // Phase 3: execute one traced page and encode outbound continuation token.
212        let (page, trace) = self
213            .with_metrics(|| {
214                self.load_executor::<E>()
215                    .execute_paged_with_cursor_traced(plan, cursor)
216            })
217            .map_err(QueryError::execute)?;
218        let next_cursor = page
219            .next_cursor
220            .map(|token| {
221                let Some(token) = token.as_scalar() else {
222                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
223                };
224
225                token.encode().map_err(|err| {
226                    QueryError::serialize_internal(format!(
227                        "failed to serialize continuation cursor: {err}"
228                    ))
229                })
230            })
231            .transpose()?;
232
233        Ok(PagedLoadExecutionWithTrace::new(
234            page.items,
235            next_cursor,
236            trace,
237        ))
238    }
239
240    /// Execute one grouped query page with optional grouped continuation cursor.
241    ///
242    /// This is the explicit grouped execution boundary; scalar load APIs reject
243    /// grouped plans to preserve scalar response contracts.
244    pub fn execute_grouped<E>(
245        &self,
246        query: &Query<E>,
247        cursor_token: Option<&str>,
248    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
249    where
250        E: PersistedRow<Canister = C> + EntityValue,
251    {
252        // Phase 1: build/validate executable plan and require grouped shape.
253        let plan = query.plan()?.into_executable();
254        Self::ensure_grouped_execution_strategy(
255            plan.execution_strategy().map_err(QueryError::execute)?,
256        )?;
257
258        // Phase 2: decode external grouped cursor token and validate against plan.
259        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
260        let cursor = plan
261            .prepare_grouped_cursor(cursor_bytes.as_deref())
262            .map_err(QueryError::from_executor_plan_error)?;
263
264        // Phase 3: execute grouped page and encode outbound grouped continuation token.
265        let (page, trace) = self
266            .with_metrics(|| {
267                self.load_executor::<E>()
268                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
269            })
270            .map_err(QueryError::execute)?;
271        let next_cursor = page
272            .next_cursor
273            .map(|token| {
274                let Some(token) = token.as_grouped() else {
275                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
276                };
277
278                token.encode().map_err(|err| {
279                    QueryError::serialize_internal(format!(
280                        "failed to serialize grouped continuation cursor: {err}"
281                    ))
282                })
283            })
284            .transpose()?;
285
286        Ok(PagedGroupedExecutionWithTrace::new(
287            page.rows,
288            next_cursor,
289            trace,
290        ))
291    }
292}
293
294const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
295    match strategy {
296        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
297        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
298        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
299    }
300}