1use crate::{
7 db::{
8 DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
9 PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
10 access::AccessStrategy,
11 cursor::CursorPlanError,
12 executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
13 query::plan::QueryMode,
14 session::decode_optional_cursor_bytes,
15 },
16 error::InternalError,
17 traits::{CanisterKind, EntityKind, EntityValue},
18};
19
20impl<C: CanisterKind> DbSession<C> {
21 fn ensure_scalar_paged_execution_strategy(
24 strategy: ExecutionStrategy,
25 ) -> Result<(), QueryError> {
26 match strategy {
27 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
28 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
29 )),
30 ExecutionStrategy::Ordered => Ok(()),
31 ExecutionStrategy::Grouped => Err(QueryError::invariant(
32 "grouped plans require execute_grouped(...)",
33 )),
34 }
35 }
36
37 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
40 match strategy {
41 ExecutionStrategy::Grouped => Ok(()),
42 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
43 QueryError::invariant("execute_grouped requires grouped logical plans"),
44 ),
45 }
46 }
47
48 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
50 where
51 E: PersistedRow<Canister = C> + EntityValue,
52 {
53 let mode = query.mode();
55 let plan = query.plan()?.into_executable();
56
57 self.execute_query_dyn(mode, plan)
59 }
60
61 #[doc(hidden)]
63 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
64 where
65 E: PersistedRow<Canister = C> + EntityValue,
66 {
67 if !query.mode().is_delete() {
69 return Err(QueryError::unsupported_query(
70 "delete count execution requires delete query mode",
71 ));
72 }
73
74 let plan = query.plan()?.into_executable();
76
77 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
79 .map_err(QueryError::execute)
80 }
81
82 pub(in crate::db) fn execute_query_dyn<E>(
87 &self,
88 mode: QueryMode,
89 plan: ExecutablePlan<E>,
90 ) -> Result<EntityResponse<E>, QueryError>
91 where
92 E: PersistedRow<Canister = C> + EntityValue,
93 {
94 let result = match mode {
95 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
96 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
97 };
98
99 result.map_err(QueryError::execute)
100 }
101
102 pub(in crate::db) fn execute_load_query_with<E, T>(
105 &self,
106 query: &Query<E>,
107 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
108 ) -> Result<T, QueryError>
109 where
110 E: PersistedRow<Canister = C> + EntityValue,
111 {
112 let plan = query.plan()?.into_executable();
113
114 self.with_metrics(|| op(self.load_executor::<E>(), plan))
115 .map_err(QueryError::execute)
116 }
117
118 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
123 where
124 E: EntityKind<Canister = C>,
125 {
126 let compiled = query.plan()?;
127 let explain = compiled.explain();
128 let plan_hash = compiled.plan_hash_hex();
129
130 let executable = compiled.into_executable();
131 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
132 let execution_strategy = match query.mode() {
133 QueryMode::Load(_) => Some(trace_execution_strategy(
134 executable
135 .execution_strategy()
136 .map_err(QueryError::execute)?,
137 )),
138 QueryMode::Delete(_) => None,
139 };
140
141 Ok(QueryTracePlan::new(
142 plan_hash,
143 access_strategy,
144 execution_strategy,
145 explain,
146 ))
147 }
148
149 pub(crate) fn execute_load_query_paged_with_trace<E>(
151 &self,
152 query: &Query<E>,
153 cursor_token: Option<&str>,
154 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
155 where
156 E: PersistedRow<Canister = C> + EntityValue,
157 {
158 let plan = query.plan()?.into_executable();
160 Self::ensure_scalar_paged_execution_strategy(
161 plan.execution_strategy().map_err(QueryError::execute)?,
162 )?;
163
164 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
166 let cursor = plan
167 .prepare_cursor(cursor_bytes.as_deref())
168 .map_err(QueryError::from_executor_plan_error)?;
169
170 let (page, trace) = self
172 .with_metrics(|| {
173 self.load_executor::<E>()
174 .execute_paged_with_cursor_traced(plan, cursor)
175 })
176 .map_err(QueryError::execute)?;
177 let next_cursor = page
178 .next_cursor
179 .map(|token| {
180 let Some(token) = token.as_scalar() else {
181 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
182 };
183
184 token.encode().map_err(|err| {
185 QueryError::serialize_internal(format!(
186 "failed to serialize continuation cursor: {err}"
187 ))
188 })
189 })
190 .transpose()?;
191
192 Ok(PagedLoadExecutionWithTrace::new(
193 page.items,
194 next_cursor,
195 trace,
196 ))
197 }
198
199 pub fn execute_grouped<E>(
204 &self,
205 query: &Query<E>,
206 cursor_token: Option<&str>,
207 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
208 where
209 E: PersistedRow<Canister = C> + EntityValue,
210 {
211 let plan = query.plan()?.into_executable();
213 Self::ensure_grouped_execution_strategy(
214 plan.execution_strategy().map_err(QueryError::execute)?,
215 )?;
216
217 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
219 let cursor = plan
220 .prepare_grouped_cursor(cursor_bytes.as_deref())
221 .map_err(QueryError::from_executor_plan_error)?;
222
223 let (page, trace) = self
225 .with_metrics(|| {
226 self.load_executor::<E>()
227 .execute_grouped_paged_with_cursor_traced(plan, cursor)
228 })
229 .map_err(QueryError::execute)?;
230 let next_cursor = page
231 .next_cursor
232 .map(|token| {
233 let Some(token) = token.as_grouped() else {
234 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
235 };
236
237 token.encode().map_err(|err| {
238 QueryError::serialize_internal(format!(
239 "failed to serialize grouped continuation cursor: {err}"
240 ))
241 })
242 })
243 .transpose()?;
244
245 Ok(PagedGroupedExecutionWithTrace::new(
246 page.rows,
247 next_cursor,
248 trace,
249 ))
250 }
251}
252
253const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
254 match strategy {
255 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
256 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
257 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
258 }
259}