1use crate::{
2 db::{
3 DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4 Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5 access::AccessStrategy,
6 executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
7 query::{
8 builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
9 plan::QueryMode,
10 },
11 session::{decode_optional_cursor_bytes, map_executor_plan_error},
12 },
13 error::InternalError,
14 traits::{CanisterKind, EntityKind, EntityValue},
15};
16
17impl<C: CanisterKind> DbSession<C> {
18 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
20 where
21 E: EntityKind<Canister = C> + EntityValue,
22 {
23 let plan = query.plan()?.into_executable();
24
25 let result = match query.mode() {
26 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
27 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
28 };
29
30 result.map_err(QueryError::execute)
31 }
32
33 pub(in crate::db) fn execute_load_query_with<E, T>(
36 &self,
37 query: &Query<E>,
38 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
39 ) -> Result<T, QueryError>
40 where
41 E: EntityKind<Canister = C> + EntityValue,
42 {
43 let plan = query.plan()?.into_executable();
44
45 self.with_metrics(|| op(self.load_executor::<E>(), plan))
46 .map_err(QueryError::execute)
47 }
48
49 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
54 where
55 E: EntityKind<Canister = C>,
56 {
57 let compiled = query.plan()?;
58 let explain = compiled.explain();
59 let plan_hash = compiled.plan_hash_hex();
60
61 let executable = compiled.into_executable();
62 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
63 let execution_strategy = match query.mode() {
64 QueryMode::Load(_) => Some(trace_execution_strategy(
65 executable
66 .execution_strategy()
67 .map_err(QueryError::execute)?,
68 )),
69 QueryMode::Delete(_) => None,
70 };
71
72 Ok(QueryTracePlan::new(
73 plan_hash,
74 access_strategy,
75 execution_strategy,
76 explain,
77 ))
78 }
79
80 pub(crate) fn explain_load_query_terminal_with<E>(
82 query: &Query<E>,
83 aggregate: AggregateExpr,
84 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
85 where
86 E: EntityKind<Canister = C> + EntityValue,
87 {
88 let compiled = query.plan()?;
90 let query_explain = compiled.explain();
91 let terminal = aggregate.kind();
92
93 let executable = compiled.into_executable();
95 let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
96
97 Ok(ExplainAggregateTerminalPlan::new(
98 query_explain,
99 terminal,
100 execution,
101 ))
102 }
103
104 pub(crate) fn execute_load_query_paged_with_trace<E>(
106 &self,
107 query: &Query<E>,
108 cursor_token: Option<&str>,
109 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
110 where
111 E: EntityKind<Canister = C> + EntityValue,
112 {
113 let plan = query.plan()?.into_executable();
115 match plan.execution_strategy().map_err(QueryError::execute)? {
116 ExecutionStrategy::PrimaryKey => {
117 return Err(QueryError::execute(
118 crate::db::error::query_executor_invariant(
119 "cursor pagination requires explicit or grouped ordering",
120 ),
121 ));
122 }
123 ExecutionStrategy::Ordered => {}
124 ExecutionStrategy::Grouped => {
125 return Err(QueryError::execute(
126 crate::db::error::query_executor_invariant(
127 "grouped plans require execute_grouped(...)",
128 ),
129 ));
130 }
131 }
132
133 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
135 let cursor = plan
136 .prepare_cursor(cursor_bytes.as_deref())
137 .map_err(map_executor_plan_error)?;
138
139 let (page, trace) = self
141 .with_metrics(|| {
142 self.load_executor::<E>()
143 .execute_paged_with_cursor_traced(plan, cursor)
144 })
145 .map_err(QueryError::execute)?;
146 let next_cursor = page
147 .next_cursor
148 .map(|token| {
149 let Some(token) = token.as_scalar() else {
150 return Err(QueryError::execute(
151 crate::db::error::query_executor_invariant(
152 "scalar load pagination emitted grouped continuation token",
153 ),
154 ));
155 };
156
157 token.encode().map_err(|err| {
158 QueryError::execute(InternalError::serialize_internal(format!(
159 "failed to serialize continuation cursor: {err}"
160 )))
161 })
162 })
163 .transpose()?;
164
165 Ok(PagedLoadExecutionWithTrace::new(
166 page.items,
167 next_cursor,
168 trace,
169 ))
170 }
171
172 pub fn execute_grouped<E>(
177 &self,
178 query: &Query<E>,
179 cursor_token: Option<&str>,
180 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
181 where
182 E: EntityKind<Canister = C> + EntityValue,
183 {
184 let plan = query.plan()?.into_executable();
186 if !matches!(
187 plan.execution_strategy().map_err(QueryError::execute)?,
188 ExecutionStrategy::Grouped
189 ) {
190 return Err(QueryError::execute(
191 crate::db::error::query_executor_invariant(
192 "execute_grouped requires grouped logical plans",
193 ),
194 ));
195 }
196
197 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
199 let cursor = plan
200 .prepare_grouped_cursor(cursor_bytes.as_deref())
201 .map_err(map_executor_plan_error)?;
202
203 let (page, trace) = self
205 .with_metrics(|| {
206 self.load_executor::<E>()
207 .execute_grouped_paged_with_cursor_traced(plan, cursor)
208 })
209 .map_err(QueryError::execute)?;
210 let next_cursor = page
211 .next_cursor
212 .map(|token| {
213 let Some(token) = token.as_grouped() else {
214 return Err(QueryError::execute(
215 crate::db::error::query_executor_invariant(
216 "grouped pagination emitted scalar continuation token",
217 ),
218 ));
219 };
220
221 token.encode().map_err(|err| {
222 QueryError::execute(InternalError::serialize_internal(format!(
223 "failed to serialize grouped continuation cursor: {err}"
224 )))
225 })
226 })
227 .transpose()?;
228
229 Ok(PagedGroupedExecutionWithTrace::new(
230 page.rows,
231 next_cursor,
232 trace,
233 ))
234 }
235}
236
237const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
238 match strategy {
239 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
240 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
241 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
242 }
243}