1use crate::{
7 db::{
8 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10 TraceExecutionStrategy,
11 access::AccessStrategy,
12 cursor::{CursorPlanError, GroupedContinuationToken},
13 diagnostics::ExecutionTrace,
14 executor::{
15 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16 },
17 query::plan::QueryMode,
18 session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
19 },
20 error::InternalError,
21 traits::{CanisterKind, EntityKind, EntityValue},
22};
23
24impl<C: CanisterKind> DbSession<C> {
25 fn ensure_scalar_paged_execution_strategy(
28 strategy: ExecutionStrategy,
29 ) -> Result<(), QueryError> {
30 match strategy {
31 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
32 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
33 )),
34 ExecutionStrategy::Ordered => Ok(()),
35 ExecutionStrategy::Grouped => Err(QueryError::invariant(
36 "grouped plans require execute_grouped(...)",
37 )),
38 }
39 }
40
41 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
44 match strategy {
45 ExecutionStrategy::Grouped => Ok(()),
46 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
47 QueryError::invariant("execute_grouped requires grouped logical plans"),
48 ),
49 }
50 }
51
52 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
54 where
55 E: PersistedRow<Canister = C> + EntityValue,
56 {
57 let mode = query.mode();
59 let plan = query.plan()?.into_executable();
60
61 self.execute_query_dyn(mode, plan)
63 }
64
65 #[doc(hidden)]
67 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
68 where
69 E: PersistedRow<Canister = C> + EntityValue,
70 {
71 if !query.mode().is_delete() {
73 return Err(QueryError::unsupported_query(
74 "delete count execution requires delete query mode",
75 ));
76 }
77
78 let plan = query.plan()?.into_executable();
80
81 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
83 .map_err(QueryError::execute)
84 }
85
86 pub(in crate::db) fn execute_query_dyn<E>(
91 &self,
92 mode: QueryMode,
93 plan: ExecutablePlan<E>,
94 ) -> Result<EntityResponse<E>, QueryError>
95 where
96 E: PersistedRow<Canister = C> + EntityValue,
97 {
98 let result = match mode {
99 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
100 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
101 };
102
103 result.map_err(QueryError::execute)
104 }
105
106 pub(in crate::db) fn execute_load_query_with<E, T>(
109 &self,
110 query: &Query<E>,
111 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
112 ) -> Result<T, QueryError>
113 where
114 E: PersistedRow<Canister = C> + EntityValue,
115 {
116 let plan = query.plan()?.into_executable();
117
118 self.with_metrics(|| op(self.load_executor::<E>(), plan))
119 .map_err(QueryError::execute)
120 }
121
122 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
127 where
128 E: EntityKind<Canister = C>,
129 {
130 let compiled = query.plan()?;
131 let explain = compiled.explain();
132 let plan_hash = compiled.plan_hash_hex();
133
134 let executable = compiled.into_executable();
135 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
136 let execution_strategy = match query.mode() {
137 QueryMode::Load(_) => Some(trace_execution_strategy(
138 executable
139 .execution_strategy()
140 .map_err(QueryError::execute)?,
141 )),
142 QueryMode::Delete(_) => None,
143 };
144
145 Ok(QueryTracePlan::new(
146 plan_hash,
147 access_strategy,
148 execution_strategy,
149 explain,
150 ))
151 }
152
153 pub(crate) fn execute_load_query_paged_with_trace<E>(
155 &self,
156 query: &Query<E>,
157 cursor_token: Option<&str>,
158 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
159 where
160 E: PersistedRow<Canister = C> + EntityValue,
161 {
162 let plan = query.plan()?.into_executable();
164 Self::ensure_scalar_paged_execution_strategy(
165 plan.execution_strategy().map_err(QueryError::execute)?,
166 )?;
167
168 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
170 let cursor = plan
171 .prepare_cursor(cursor_bytes.as_deref())
172 .map_err(QueryError::from_executor_plan_error)?;
173
174 let (page, trace) = self
176 .with_metrics(|| {
177 self.load_executor::<E>()
178 .execute_paged_with_cursor_traced(plan, cursor)
179 })
180 .map_err(QueryError::execute)?;
181 let next_cursor = page
182 .next_cursor
183 .map(|token| {
184 let Some(token) = token.as_scalar() else {
185 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
186 };
187
188 token.encode().map_err(|err| {
189 QueryError::serialize_internal(format!(
190 "failed to serialize continuation cursor: {err}"
191 ))
192 })
193 })
194 .transpose()?;
195
196 Ok(PagedLoadExecutionWithTrace::new(
197 page.items,
198 next_cursor,
199 trace,
200 ))
201 }
202
203 pub fn execute_grouped<E>(
208 &self,
209 query: &Query<E>,
210 cursor_token: Option<&str>,
211 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
212 where
213 E: PersistedRow<Canister = C> + EntityValue,
214 {
215 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
216 let next_cursor = page
217 .next_cursor
218 .map(|token| {
219 let Some(token) = token.as_grouped() else {
220 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
221 };
222
223 token.encode().map_err(|err| {
224 QueryError::serialize_internal(format!(
225 "failed to serialize grouped continuation cursor: {err}"
226 ))
227 })
228 })
229 .transpose()?;
230
231 Ok(PagedGroupedExecutionWithTrace::new(
232 page.rows,
233 next_cursor,
234 trace,
235 ))
236 }
237
238 #[doc(hidden)]
240 pub fn execute_grouped_text_cursor<E>(
241 &self,
242 query: &Query<E>,
243 cursor_token: Option<&str>,
244 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
245 where
246 E: PersistedRow<Canister = C> + EntityValue,
247 {
248 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
249 let next_cursor = page
250 .next_cursor
251 .map(Self::encode_grouped_page_cursor_hex)
252 .transpose()?;
253
254 Ok((page.rows, next_cursor, trace))
255 }
256}
257
258impl<C: CanisterKind> DbSession<C> {
259 fn execute_grouped_page_with_trace<E>(
262 &self,
263 query: &Query<E>,
264 cursor_token: Option<&str>,
265 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
266 where
267 E: PersistedRow<Canister = C> + EntityValue,
268 {
269 let plan = query.plan()?.into_executable();
271 Self::ensure_grouped_execution_strategy(
272 plan.execution_strategy().map_err(QueryError::execute)?,
273 )?;
274
275 let cursor = decode_optional_grouped_cursor(cursor_token)?;
277 let cursor = plan
278 .prepare_grouped_cursor_token(cursor)
279 .map_err(QueryError::from_executor_plan_error)?;
280
281 self.with_metrics(|| {
284 self.load_executor::<E>()
285 .execute_grouped_paged_with_cursor_traced(plan, cursor)
286 })
287 .map_err(QueryError::execute)
288 }
289
290 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
293 let token: &GroupedContinuationToken = page_cursor
294 .as_grouped()
295 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
296
297 token.encode_hex().map_err(|err| {
298 QueryError::serialize_internal(format!(
299 "failed to serialize grouped continuation cursor: {err}"
300 ))
301 })
302 }
303}
304
305const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
306 match strategy {
307 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
308 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
309 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
310 }
311}