1use crate::{
2 db::{
3 DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4 PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5 access::AccessStrategy,
6 cursor::CursorPlanError,
7 executor::{BytesByProjectionMode, ExecutablePlan, ExecutionStrategy, LoadExecutor},
8 query::{
9 builder::aggregate::AggregateExpr,
10 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
11 plan::QueryMode,
12 },
13 session::decode_optional_cursor_bytes,
14 },
15 error::InternalError,
16 traits::{CanisterKind, EntityKind, EntityValue},
17 value::Value,
18};
19
20impl<C: CanisterKind> DbSession<C> {
21 fn ensure_scalar_paged_execution_strategy(
24 strategy: ExecutionStrategy,
25 ) -> Result<(), QueryError> {
26 match strategy {
27 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
28 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
29 )),
30 ExecutionStrategy::Ordered => Ok(()),
31 ExecutionStrategy::Grouped => Err(QueryError::invariant(
32 "grouped plans require execute_grouped(...)",
33 )),
34 }
35 }
36
37 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
40 match strategy {
41 ExecutionStrategy::Grouped => Ok(()),
42 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
43 QueryError::invariant("execute_grouped requires grouped logical plans"),
44 ),
45 }
46 }
47
48 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
50 where
51 E: PersistedRow<Canister = C> + EntityValue,
52 {
53 let mode = query.mode();
55 let plan = query.plan()?.into_executable();
56
57 self.execute_query_dyn(mode, plan)
59 }
60
61 pub(in crate::db) fn execute_query_dyn<E>(
66 &self,
67 mode: QueryMode,
68 plan: ExecutablePlan<E>,
69 ) -> Result<EntityResponse<E>, QueryError>
70 where
71 E: PersistedRow<Canister = C> + EntityValue,
72 {
73 let result = match mode {
74 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
75 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
76 };
77
78 result.map_err(QueryError::execute)
79 }
80
81 pub(in crate::db) fn execute_load_query_with<E, T>(
84 &self,
85 query: &Query<E>,
86 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
87 ) -> Result<T, QueryError>
88 where
89 E: PersistedRow<Canister = C> + EntityValue,
90 {
91 let plan = query.plan()?.into_executable();
92
93 self.with_metrics(|| op(self.load_executor::<E>(), plan))
94 .map_err(QueryError::execute)
95 }
96
97 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
102 where
103 E: EntityKind<Canister = C>,
104 {
105 let compiled = query.plan()?;
106 let explain = compiled.explain();
107 let plan_hash = compiled.plan_hash_hex();
108
109 let executable = compiled.into_executable();
110 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
111 let execution_strategy = match query.mode() {
112 QueryMode::Load(_) => Some(trace_execution_strategy(
113 executable
114 .execution_strategy()
115 .map_err(QueryError::execute)?,
116 )),
117 QueryMode::Delete(_) => None,
118 };
119
120 Ok(QueryTracePlan::new(
121 plan_hash,
122 access_strategy,
123 execution_strategy,
124 explain,
125 ))
126 }
127
128 pub(crate) fn explain_load_query_terminal_with<E>(
130 query: &Query<E>,
131 aggregate: AggregateExpr,
132 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
133 where
134 E: EntityKind<Canister = C> + EntityValue,
135 {
136 let compiled = query.plan()?;
138 let query_explain = compiled.explain();
139 let terminal = aggregate.kind();
140
141 let executable = compiled.into_executable();
143 let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
144
145 Ok(ExplainAggregateTerminalPlan::new(
146 query_explain,
147 terminal,
148 execution,
149 ))
150 }
151
152 pub(crate) fn explain_load_query_bytes_by_with<E>(
154 query: &Query<E>,
155 target_field: &str,
156 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
157 where
158 E: EntityKind<Canister = C> + EntityValue,
159 {
160 let executable = query.plan()?.into_executable();
161 let mut descriptor = executable
162 .explain_load_execution_node_descriptor()
163 .map_err(QueryError::execute)?;
164 let projection_mode = executable.bytes_by_projection_mode(target_field);
165 let projection_mode_label =
166 ExecutablePlan::<E>::bytes_by_projection_mode_label(projection_mode);
167
168 descriptor
169 .node_properties
170 .insert("terminal".to_string(), Value::from("bytes_by"));
171 descriptor.node_properties.insert(
172 "terminal_field".to_string(),
173 Value::from(target_field.to_string()),
174 );
175 descriptor.node_properties.insert(
176 "terminal_projection_mode".to_string(),
177 Value::from(projection_mode_label),
178 );
179 descriptor.node_properties.insert(
180 "terminal_index_only".to_string(),
181 Value::from(matches!(
182 projection_mode,
183 BytesByProjectionMode::CoveringIndex | BytesByProjectionMode::CoveringConstant
184 )),
185 );
186
187 Ok(descriptor)
188 }
189
190 pub(crate) fn execute_load_query_paged_with_trace<E>(
192 &self,
193 query: &Query<E>,
194 cursor_token: Option<&str>,
195 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
196 where
197 E: PersistedRow<Canister = C> + EntityValue,
198 {
199 let plan = query.plan()?.into_executable();
201 Self::ensure_scalar_paged_execution_strategy(
202 plan.execution_strategy().map_err(QueryError::execute)?,
203 )?;
204
205 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
207 let cursor = plan
208 .prepare_cursor(cursor_bytes.as_deref())
209 .map_err(QueryError::from_executor_plan_error)?;
210
211 let (page, trace) = self
213 .with_metrics(|| {
214 self.load_executor::<E>()
215 .execute_paged_with_cursor_traced(plan, cursor)
216 })
217 .map_err(QueryError::execute)?;
218 let next_cursor = page
219 .next_cursor
220 .map(|token| {
221 let Some(token) = token.as_scalar() else {
222 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
223 };
224
225 token.encode().map_err(|err| {
226 QueryError::serialize_internal(format!(
227 "failed to serialize continuation cursor: {err}"
228 ))
229 })
230 })
231 .transpose()?;
232
233 Ok(PagedLoadExecutionWithTrace::new(
234 page.items,
235 next_cursor,
236 trace,
237 ))
238 }
239
240 pub fn execute_grouped<E>(
245 &self,
246 query: &Query<E>,
247 cursor_token: Option<&str>,
248 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
249 where
250 E: PersistedRow<Canister = C> + EntityValue,
251 {
252 let plan = query.plan()?.into_executable();
254 Self::ensure_grouped_execution_strategy(
255 plan.execution_strategy().map_err(QueryError::execute)?,
256 )?;
257
258 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
260 let cursor = plan
261 .prepare_grouped_cursor(cursor_bytes.as_deref())
262 .map_err(QueryError::from_executor_plan_error)?;
263
264 let (page, trace) = self
266 .with_metrics(|| {
267 self.load_executor::<E>()
268 .execute_grouped_paged_with_cursor_traced(plan, cursor)
269 })
270 .map_err(QueryError::execute)?;
271 let next_cursor = page
272 .next_cursor
273 .map(|token| {
274 let Some(token) = token.as_grouped() else {
275 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
276 };
277
278 token.encode().map_err(|err| {
279 QueryError::serialize_internal(format!(
280 "failed to serialize grouped continuation cursor: {err}"
281 ))
282 })
283 })
284 .transpose()?;
285
286 Ok(PagedGroupedExecutionWithTrace::new(
287 page.rows,
288 next_cursor,
289 trace,
290 ))
291 }
292}
293
294const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
295 match strategy {
296 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
297 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
298 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
299 }
300}