1use crate::{
2 db::{
3 DbSession, EntityResponse, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace,
4 PersistedRow, Query, QueryError, QueryTracePlan, TraceExecutionStrategy,
5 access::AccessStrategy,
6 cursor::CursorPlanError,
7 executor::{ExecutablePlan, ExecutionStrategy, LoadExecutor},
8 query::plan::QueryMode,
9 session::decode_optional_cursor_bytes,
10 },
11 error::InternalError,
12 traits::{CanisterKind, EntityKind, EntityValue},
13};
14
15impl<C: CanisterKind> DbSession<C> {
16 fn ensure_scalar_paged_execution_strategy(
19 strategy: ExecutionStrategy,
20 ) -> Result<(), QueryError> {
21 match strategy {
22 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
23 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
24 )),
25 ExecutionStrategy::Ordered => Ok(()),
26 ExecutionStrategy::Grouped => Err(QueryError::invariant(
27 "grouped plans require execute_grouped(...)",
28 )),
29 }
30 }
31
32 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
35 match strategy {
36 ExecutionStrategy::Grouped => Ok(()),
37 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
38 QueryError::invariant("execute_grouped requires grouped logical plans"),
39 ),
40 }
41 }
42
43 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
45 where
46 E: PersistedRow<Canister = C> + EntityValue,
47 {
48 let mode = query.mode();
50 let plan = query.plan()?.into_executable();
51
52 self.execute_query_dyn(mode, plan)
54 }
55
56 pub(in crate::db) fn execute_query_dyn<E>(
61 &self,
62 mode: QueryMode,
63 plan: ExecutablePlan<E>,
64 ) -> Result<EntityResponse<E>, QueryError>
65 where
66 E: PersistedRow<Canister = C> + EntityValue,
67 {
68 let result = match mode {
69 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
70 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
71 };
72
73 result.map_err(QueryError::execute)
74 }
75
76 pub(in crate::db) fn execute_load_query_with<E, T>(
79 &self,
80 query: &Query<E>,
81 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
82 ) -> Result<T, QueryError>
83 where
84 E: PersistedRow<Canister = C> + EntityValue,
85 {
86 let plan = query.plan()?.into_executable();
87
88 self.with_metrics(|| op(self.load_executor::<E>(), plan))
89 .map_err(QueryError::execute)
90 }
91
92 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
97 where
98 E: EntityKind<Canister = C>,
99 {
100 let compiled = query.plan()?;
101 let explain = compiled.explain();
102 let plan_hash = compiled.plan_hash_hex();
103
104 let executable = compiled.into_executable();
105 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
106 let execution_strategy = match query.mode() {
107 QueryMode::Load(_) => Some(trace_execution_strategy(
108 executable
109 .execution_strategy()
110 .map_err(QueryError::execute)?,
111 )),
112 QueryMode::Delete(_) => None,
113 };
114
115 Ok(QueryTracePlan::new(
116 plan_hash,
117 access_strategy,
118 execution_strategy,
119 explain,
120 ))
121 }
122
123 pub(crate) fn execute_load_query_paged_with_trace<E>(
125 &self,
126 query: &Query<E>,
127 cursor_token: Option<&str>,
128 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
129 where
130 E: PersistedRow<Canister = C> + EntityValue,
131 {
132 let plan = query.plan()?.into_executable();
134 Self::ensure_scalar_paged_execution_strategy(
135 plan.execution_strategy().map_err(QueryError::execute)?,
136 )?;
137
138 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
140 let cursor = plan
141 .prepare_cursor(cursor_bytes.as_deref())
142 .map_err(QueryError::from_executor_plan_error)?;
143
144 let (page, trace) = self
146 .with_metrics(|| {
147 self.load_executor::<E>()
148 .execute_paged_with_cursor_traced(plan, cursor)
149 })
150 .map_err(QueryError::execute)?;
151 let next_cursor = page
152 .next_cursor
153 .map(|token| {
154 let Some(token) = token.as_scalar() else {
155 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
156 };
157
158 token.encode().map_err(|err| {
159 QueryError::serialize_internal(format!(
160 "failed to serialize continuation cursor: {err}"
161 ))
162 })
163 })
164 .transpose()?;
165
166 Ok(PagedLoadExecutionWithTrace::new(
167 page.items,
168 next_cursor,
169 trace,
170 ))
171 }
172
173 pub fn execute_grouped<E>(
178 &self,
179 query: &Query<E>,
180 cursor_token: Option<&str>,
181 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
182 where
183 E: PersistedRow<Canister = C> + EntityValue,
184 {
185 let plan = query.plan()?.into_executable();
187 Self::ensure_grouped_execution_strategy(
188 plan.execution_strategy().map_err(QueryError::execute)?,
189 )?;
190
191 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
193 let cursor = plan
194 .prepare_grouped_cursor(cursor_bytes.as_deref())
195 .map_err(QueryError::from_executor_plan_error)?;
196
197 let (page, trace) = self
199 .with_metrics(|| {
200 self.load_executor::<E>()
201 .execute_grouped_paged_with_cursor_traced(plan, cursor)
202 })
203 .map_err(QueryError::execute)?;
204 let next_cursor = page
205 .next_cursor
206 .map(|token| {
207 let Some(token) = token.as_grouped() else {
208 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
209 };
210
211 token.encode().map_err(|err| {
212 QueryError::serialize_internal(format!(
213 "failed to serialize grouped continuation cursor: {err}"
214 ))
215 })
216 })
217 .transpose()?;
218
219 Ok(PagedGroupedExecutionWithTrace::new(
220 page.rows,
221 next_cursor,
222 trace,
223 ))
224 }
225}
226
227const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
228 match strategy {
229 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
230 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
231 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
232 }
233}