1use crate::{
2 db::{
3 DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4 Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5 access::AccessStrategy,
6 executor::{BytesByProjectionMode, ExecutablePlan, ExecutionStrategy, LoadExecutor},
7 query::{
8 builder::aggregate::AggregateExpr,
9 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
10 plan::QueryMode,
11 },
12 session::{decode_optional_cursor_bytes, map_executor_plan_error},
13 },
14 error::InternalError,
15 traits::{CanisterKind, EntityKind, EntityValue},
16 value::Value,
17};
18
19impl<C: CanisterKind> DbSession<C> {
20 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
22 where
23 E: EntityKind<Canister = C> + EntityValue,
24 {
25 let mode = query.mode();
27 let plan = query.plan()?.into_executable();
28
29 self.execute_query_dyn(mode, plan)
31 }
32
33 pub(in crate::db) fn execute_query_dyn<E>(
39 &self,
40 mode: QueryMode,
41 plan: ExecutablePlan<E>,
42 ) -> Result<EntityResponse<E>, QueryError>
43 where
44 E: EntityKind<Canister = C> + EntityValue,
45 {
46 let result = match mode {
47 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
48 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
49 };
50
51 result.map_err(QueryError::execute)
52 }
53
54 pub(in crate::db) fn execute_load_query_with<E, T>(
57 &self,
58 query: &Query<E>,
59 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
60 ) -> Result<T, QueryError>
61 where
62 E: EntityKind<Canister = C> + EntityValue,
63 {
64 let plan = query.plan()?.into_executable();
65
66 self.with_metrics(|| op(self.load_executor::<E>(), plan))
67 .map_err(QueryError::execute)
68 }
69
70 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
75 where
76 E: EntityKind<Canister = C>,
77 {
78 let compiled = query.plan()?;
79 let explain = compiled.explain();
80 let plan_hash = compiled.plan_hash_hex();
81
82 let executable = compiled.into_executable();
83 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
84 let execution_strategy = match query.mode() {
85 QueryMode::Load(_) => Some(trace_execution_strategy(
86 executable
87 .execution_strategy()
88 .map_err(QueryError::execute)?,
89 )),
90 QueryMode::Delete(_) => None,
91 };
92
93 Ok(QueryTracePlan::new(
94 plan_hash,
95 access_strategy,
96 execution_strategy,
97 explain,
98 ))
99 }
100
101 pub(crate) fn explain_load_query_terminal_with<E>(
103 query: &Query<E>,
104 aggregate: AggregateExpr,
105 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
106 where
107 E: EntityKind<Canister = C> + EntityValue,
108 {
109 let compiled = query.plan()?;
111 let query_explain = compiled.explain();
112 let terminal = aggregate.kind();
113
114 let executable = compiled.into_executable();
116 let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
117
118 Ok(ExplainAggregateTerminalPlan::new(
119 query_explain,
120 terminal,
121 execution,
122 ))
123 }
124
125 pub(crate) fn explain_load_query_bytes_by_with<E>(
127 query: &Query<E>,
128 target_field: &str,
129 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
130 where
131 E: EntityKind<Canister = C> + EntityValue,
132 {
133 let executable = query.plan()?.into_executable();
134 let mut descriptor = executable
135 .explain_load_execution_node_descriptor()
136 .map_err(QueryError::execute)?;
137 let projection_mode = executable.bytes_by_projection_mode(target_field);
138 let projection_mode_label =
139 ExecutablePlan::<E>::bytes_by_projection_mode_label(projection_mode);
140
141 descriptor
142 .node_properties
143 .insert("terminal".to_string(), Value::from("bytes_by"));
144 descriptor.node_properties.insert(
145 "terminal_field".to_string(),
146 Value::from(target_field.to_string()),
147 );
148 descriptor.node_properties.insert(
149 "terminal_projection_mode".to_string(),
150 Value::from(projection_mode_label),
151 );
152 descriptor.node_properties.insert(
153 "terminal_index_only".to_string(),
154 Value::from(matches!(
155 projection_mode,
156 BytesByProjectionMode::CoveringIndex | BytesByProjectionMode::CoveringConstant
157 )),
158 );
159
160 Ok(descriptor)
161 }
162
163 pub(crate) fn execute_load_query_paged_with_trace<E>(
165 &self,
166 query: &Query<E>,
167 cursor_token: Option<&str>,
168 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
169 where
170 E: EntityKind<Canister = C> + EntityValue,
171 {
172 let plan = query.plan()?.into_executable();
174 match plan.execution_strategy().map_err(QueryError::execute)? {
175 ExecutionStrategy::PrimaryKey => {
176 return Err(QueryError::execute(
177 crate::db::error::query_executor_invariant(
178 "cursor pagination requires explicit or grouped ordering",
179 ),
180 ));
181 }
182 ExecutionStrategy::Ordered => {}
183 ExecutionStrategy::Grouped => {
184 return Err(QueryError::execute(
185 crate::db::error::query_executor_invariant(
186 "grouped plans require execute_grouped(...)",
187 ),
188 ));
189 }
190 }
191
192 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
194 let cursor = plan
195 .prepare_cursor(cursor_bytes.as_deref())
196 .map_err(map_executor_plan_error)?;
197
198 let (page, trace) = self
200 .with_metrics(|| {
201 self.load_executor::<E>()
202 .execute_paged_with_cursor_traced(plan, cursor)
203 })
204 .map_err(QueryError::execute)?;
205 let next_cursor = page
206 .next_cursor
207 .map(|token| {
208 let Some(token) = token.as_scalar() else {
209 return Err(QueryError::execute(
210 crate::db::error::query_executor_invariant(
211 "scalar load pagination emitted grouped continuation token",
212 ),
213 ));
214 };
215
216 token.encode().map_err(|err| {
217 QueryError::execute(InternalError::serialize_internal(format!(
218 "failed to serialize continuation cursor: {err}"
219 )))
220 })
221 })
222 .transpose()?;
223
224 Ok(PagedLoadExecutionWithTrace::new(
225 page.items,
226 next_cursor,
227 trace,
228 ))
229 }
230
231 pub fn execute_grouped<E>(
236 &self,
237 query: &Query<E>,
238 cursor_token: Option<&str>,
239 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
240 where
241 E: EntityKind<Canister = C> + EntityValue,
242 {
243 let plan = query.plan()?.into_executable();
245 if !matches!(
246 plan.execution_strategy().map_err(QueryError::execute)?,
247 ExecutionStrategy::Grouped
248 ) {
249 return Err(QueryError::execute(
250 crate::db::error::query_executor_invariant(
251 "execute_grouped requires grouped logical plans",
252 ),
253 ));
254 }
255
256 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
258 let cursor = plan
259 .prepare_grouped_cursor(cursor_bytes.as_deref())
260 .map_err(map_executor_plan_error)?;
261
262 let (page, trace) = self
264 .with_metrics(|| {
265 self.load_executor::<E>()
266 .execute_grouped_paged_with_cursor_traced(plan, cursor)
267 })
268 .map_err(QueryError::execute)?;
269 let next_cursor = page
270 .next_cursor
271 .map(|token| {
272 let Some(token) = token.as_grouped() else {
273 return Err(QueryError::execute(
274 crate::db::error::query_executor_invariant(
275 "grouped pagination emitted scalar continuation token",
276 ),
277 ));
278 };
279
280 token.encode().map_err(|err| {
281 QueryError::execute(InternalError::serialize_internal(format!(
282 "failed to serialize grouped continuation cursor: {err}"
283 )))
284 })
285 })
286 .transpose()?;
287
288 Ok(PagedGroupedExecutionWithTrace::new(
289 page.rows,
290 next_cursor,
291 trace,
292 ))
293 }
294}
295
296const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
297 match strategy {
298 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
299 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
300 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
301 }
302}