Skip to main content

icydb_core/db/session/
query.rs

1use crate::{
2    db::{
3        DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4        PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5        access::AccessStrategy,
6        cursor::CursorPlanError,
7        executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
8        query::plan::QueryMode,
9        session::decode_optional_cursor_bytes,
10    },
11    error::InternalError,
12    traits::{CanisterKind, EntityKind, EntityValue},
13};
14
15impl<C: CanisterKind> DbSession<C> {
16    // Validate that one execution strategy is admissible for scalar paged load
17    // execution and fail closed on grouped/primary-key-only routes.
18    fn ensure_scalar_paged_execution_strategy(
19        strategy: ExecutionStrategy,
20    ) -> Result<(), QueryError> {
21        match strategy {
22            ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
23                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
24            )),
25            ExecutionStrategy::Ordered => Ok(()),
26            ExecutionStrategy::Grouped => Err(QueryError::invariant(
27                "grouped plans require execute_grouped(...)",
28            )),
29        }
30    }
31
32    // Validate that one execution strategy is admissible for the grouped
33    // execution surface.
34    fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
35        match strategy {
36            ExecutionStrategy::Grouped => Ok(()),
37            ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
38                QueryError::invariant("execute_grouped requires grouped logical plans"),
39            ),
40        }
41    }
42
43    /// Execute one scalar load/delete query and return materialized response rows.
44    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
45    where
46        E: PersistedRow<Canister = C> + EntityValue,
47    {
48        // Phase 1: compile typed intent into one executable plan contract.
49        let mode = query.mode();
50        let plan = query.plan()?.into_executable();
51
52        // Phase 2: delegate execution to the shared compiled-plan entry path.
53        self.execute_query_dyn(mode, plan)
54    }
55
56    /// Execute one scalar query from one pre-built executable contract.
57    ///
58    /// This is the shared compiled-plan entry boundary used by the typed
59    /// `execute_query(...)` surface and adjacent query execution facades.
60    pub(in crate::db) fn execute_query_dyn<E>(
61        &self,
62        mode: QueryMode,
63        plan: ExecutablePlan<E>,
64    ) -> Result<EntityResponse<E>, QueryError>
65    where
66        E: PersistedRow<Canister = C> + EntityValue,
67    {
68        let result = match mode {
69            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
70            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
71        };
72
73        result.map_err(QueryError::execute)
74    }
75
76    // Shared load-query terminal wrapper: build plan, run under metrics, map
77    // execution errors into query-facing errors.
78    pub(in crate::db) fn execute_load_query_with<E, T>(
79        &self,
80        query: &Query<E>,
81        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
82    ) -> Result<T, QueryError>
83    where
84        E: PersistedRow<Canister = C> + EntityValue,
85    {
86        let plan = query.plan()?.into_executable();
87
88        self.with_metrics(|| op(self.load_executor::<E>(), plan))
89            .map_err(QueryError::execute)
90    }
91
92    /// Build one trace payload for a query without executing it.
93    ///
94    /// This lightweight surface is intended for developer diagnostics:
95    /// plan hash, access strategy summary, and planner/executor route shape.
96    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
97    where
98        E: EntityKind<Canister = C>,
99    {
100        let compiled = query.plan()?;
101        let explain = compiled.explain();
102        let plan_hash = compiled.plan_hash_hex();
103
104        let executable = compiled.into_executable();
105        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
106        let execution_strategy = match query.mode() {
107            QueryMode::Load(_) => Some(trace_execution_strategy(
108                executable
109                    .execution_strategy()
110                    .map_err(QueryError::execute)?,
111            )),
112            QueryMode::Delete(_) => None,
113        };
114
115        Ok(QueryTracePlan::new(
116            plan_hash,
117            access_strategy,
118            execution_strategy,
119            explain,
120        ))
121    }
122
123    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
124    pub(crate) fn execute_load_query_paged_with_trace<E>(
125        &self,
126        query: &Query<E>,
127        cursor_token: Option<&str>,
128    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
129    where
130        E: PersistedRow<Canister = C> + EntityValue,
131    {
132        // Phase 1: build/validate executable plan and reject grouped plans.
133        let plan = query.plan()?.into_executable();
134        Self::ensure_scalar_paged_execution_strategy(
135            plan.execution_strategy().map_err(QueryError::execute)?,
136        )?;
137
138        // Phase 2: decode external cursor token and validate it against plan surface.
139        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
140        let cursor = plan
141            .prepare_cursor(cursor_bytes.as_deref())
142            .map_err(QueryError::from_executor_plan_error)?;
143
144        // Phase 3: execute one traced page and encode outbound continuation token.
145        let (page, trace) = self
146            .with_metrics(|| {
147                self.load_executor::<E>()
148                    .execute_paged_with_cursor_traced(plan, cursor)
149            })
150            .map_err(QueryError::execute)?;
151        let next_cursor = page
152            .next_cursor
153            .map(|token| {
154                let Some(token) = token.as_scalar() else {
155                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
156                };
157
158                token.encode().map_err(|err| {
159                    QueryError::serialize_internal(format!(
160                        "failed to serialize continuation cursor: {err}"
161                    ))
162                })
163            })
164            .transpose()?;
165
166        Ok(PagedLoadExecutionWithTrace::new(
167            page.items,
168            next_cursor,
169            trace,
170        ))
171    }
172
173    /// Execute one grouped query page with optional grouped continuation cursor.
174    ///
175    /// This is the explicit grouped execution boundary; scalar load APIs reject
176    /// grouped plans to preserve scalar response contracts.
177    pub fn execute_grouped<E>(
178        &self,
179        query: &Query<E>,
180        cursor_token: Option<&str>,
181    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
182    where
183        E: PersistedRow<Canister = C> + EntityValue,
184    {
185        // Phase 1: build/validate executable plan and require grouped shape.
186        let plan = query.plan()?.into_executable();
187        Self::ensure_grouped_execution_strategy(
188            plan.execution_strategy().map_err(QueryError::execute)?,
189        )?;
190
191        // Phase 2: decode external grouped cursor token and validate against plan.
192        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
193        let cursor = plan
194            .prepare_grouped_cursor(cursor_bytes.as_deref())
195            .map_err(QueryError::from_executor_plan_error)?;
196
197        // Phase 3: execute grouped page and encode outbound grouped continuation token.
198        let (page, trace) = self
199            .with_metrics(|| {
200                self.load_executor::<E>()
201                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
202            })
203            .map_err(QueryError::execute)?;
204        let next_cursor = page
205            .next_cursor
206            .map(|token| {
207                let Some(token) = token.as_grouped() else {
208                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
209                };
210
211                token.encode().map_err(|err| {
212                    QueryError::serialize_internal(format!(
213                        "failed to serialize grouped continuation cursor: {err}"
214                    ))
215                })
216            })
217            .transpose()?;
218
219        Ok(PagedGroupedExecutionWithTrace::new(
220            page.rows,
221            next_cursor,
222            trace,
223        ))
224    }
225}
226
227const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
228    match strategy {
229        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
230        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
231        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
232    }
233}