1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17 db::{
18 Db,
19 access::AccessPlan,
20 cursor::{ContinuationToken, CursorBoundary, PlannedCursor, decode_pk_cursor_boundary},
21 direction::Direction,
22 executor::{
23 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
24 IndexPredicateCompileMode, KeyOrderComparator, OrderedKeyStreamBox,
25 aggregate_model::field::{
26 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28 },
29 plan_metrics::{record_plan_metrics, record_rows_scanned},
30 range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
31 },
32 policy,
33 query::plan::{AccessPlannedQuery, OrderDirection},
34 response::Response,
35 },
36 error::InternalError,
37 obs::sink::{ExecKind, Span},
38 traits::{EntityKind, EntityValue},
39};
40use std::marker::PhantomData;
41
42#[derive(Debug)]
50pub(crate) struct CursorPage<E: EntityKind> {
51 pub(crate) items: Response<E>,
52
53 pub(crate) next_cursor: Option<ContinuationToken>,
54}
55
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
63pub enum ExecutionAccessPathVariant {
64 ByKey,
65 ByKeys,
66 KeyRange,
67 IndexPrefix,
68 IndexRange,
69 FullScan,
70 Union,
71 Intersection,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub enum ExecutionOptimization {
82 PrimaryKey,
83 SecondaryOrderPushdown,
84 IndexRangeLimitPushdown,
85}
86
87#[derive(Clone, Copy, Debug, Eq, PartialEq)]
95pub struct ExecutionTrace {
96 pub access_path_variant: ExecutionAccessPathVariant,
97 pub direction: OrderDirection,
98 pub optimization: Option<ExecutionOptimization>,
99 pub keys_scanned: u64,
100 pub rows_returned: u64,
101 pub continuation_applied: bool,
102 pub index_predicate_applied: bool,
103 pub index_predicate_keys_rejected: u64,
104 pub distinct_keys_deduped: u64,
105}
106
107impl ExecutionTrace {
108 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
109 Self {
110 access_path_variant: access_path_variant(access),
111 direction: execution_order_direction(direction),
112 optimization: None,
113 keys_scanned: 0,
114 rows_returned: 0,
115 continuation_applied,
116 index_predicate_applied: false,
117 index_predicate_keys_rejected: 0,
118 distinct_keys_deduped: 0,
119 }
120 }
121
122 fn set_path_outcome(
123 &mut self,
124 optimization: Option<ExecutionOptimization>,
125 keys_scanned: usize,
126 rows_returned: usize,
127 index_predicate_applied: bool,
128 index_predicate_keys_rejected: u64,
129 distinct_keys_deduped: u64,
130 ) {
131 self.optimization = optimization;
132 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
133 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
134 self.index_predicate_applied = index_predicate_applied;
135 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
136 self.distinct_keys_deduped = distinct_keys_deduped;
137 }
138}
139
140pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
141 direction: Direction,
142) -> KeyOrderComparator {
143 KeyOrderComparator::from_direction(direction)
144}
145
146pub(in crate::db::executor) struct FastPathKeyResult {
154 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
155 pub(in crate::db::executor) rows_scanned: usize,
156 pub(in crate::db::executor) optimization: ExecutionOptimization,
157}
158
159#[derive(Clone)]
167pub(crate) struct LoadExecutor<E: EntityKind> {
168 db: Db<E::Canister>,
169 debug: bool,
170 _marker: PhantomData<E>,
171}
172
173impl<E> LoadExecutor<E>
174where
175 E: EntityKind + EntityValue,
176{
177 #[must_use]
178 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
179 Self {
180 db,
181 debug,
182 _marker: PhantomData,
183 }
184 }
185
186 pub(in crate::db::executor) fn recovered_context(
188 &self,
189 ) -> Result<crate::db::Context<'_, E>, InternalError> {
190 self.db.recovered_context::<E>()
191 }
192
193 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
196 resolve_orderable_aggregate_target_slot::<E>(target_field)
197 .map_err(AggregateFieldValueError::into_internal_error)
198 }
199
200 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
203 resolve_any_aggregate_target_slot::<E>(target_field)
204 .map_err(AggregateFieldValueError::into_internal_error)
205 }
206
207 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
210 resolve_numeric_aggregate_target_slot::<E>(target_field)
211 .map_err(AggregateFieldValueError::into_internal_error)
212 }
213
214 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
215 self.execute_paged_with_cursor(plan, PlannedCursor::none())
216 .map(|page| page.items)
217 }
218
219 pub(in crate::db) fn execute_paged_with_cursor(
220 &self,
221 plan: ExecutablePlan<E>,
222 cursor: impl Into<PlannedCursor>,
223 ) -> Result<CursorPage<E>, InternalError> {
224 self.execute_paged_with_cursor_traced(plan, cursor)
225 .map(|(page, _)| page)
226 }
227
228 pub(in crate::db) fn execute_paged_with_cursor_traced(
229 &self,
230 plan: ExecutablePlan<E>,
231 cursor: impl Into<PlannedCursor>,
232 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
233 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
234 let cursor_boundary = cursor.boundary().cloned();
235 let index_range_token = cursor
236 .index_range_anchor()
237 .map(range_token_from_cursor_anchor);
238
239 if !plan.mode().is_load() {
240 return Err(InternalError::query_executor_invariant(
241 "load executor requires load plans",
242 ));
243 }
244 debug_assert!(
245 policy::validate_plan_shape(plan.as_inner()).is_ok(),
246 "load executor received a plan shape that bypassed planning validation",
247 );
248
249 let continuation_signature = plan.continuation_signature();
250 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
251 let index_range_specs = plan.index_range_specs()?.to_vec();
252 let route_plan = Self::build_execution_route_plan_for_load(
253 plan.as_inner(),
254 cursor_boundary.as_ref(),
255 index_range_token.as_ref(),
256 None,
257 )?;
258 let continuation_applied = !matches!(
259 route_plan.continuation_mode(),
260 crate::db::executor::route::ContinuationMode::Initial
261 );
262 let direction = route_plan.direction();
263 debug_assert_eq!(
264 route_plan.window().effective_offset,
265 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
266 "route window effective offset must match logical plan offset semantics",
267 );
268 let mut execution_trace = self
269 .debug
270 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
271 let plan = plan.into_inner();
272 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
273
274 let result = (|| {
275 let mut span = Span::<E>::new(ExecKind::Load);
276
277 validate_executor_plan::<E>(&plan)?;
278 let ctx = self.db.recovered_context::<E>()?;
279 let execution_inputs = ExecutionInputs {
280 ctx: &ctx,
281 plan: &plan,
282 stream_bindings: AccessStreamBindings {
283 index_prefix_specs: index_prefix_specs.as_slice(),
284 index_range_specs: index_range_specs.as_slice(),
285 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
286 direction,
287 },
288 execution_preparation: &execution_preparation,
289 };
290
291 record_plan_metrics(&plan.access);
292 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
295 &execution_inputs,
296 &route_plan,
297 cursor_boundary.as_ref(),
298 continuation_signature,
299 IndexPredicateCompileMode::ConservativeSubset,
300 )?;
301 let page = materialized.page;
302 let rows_scanned = materialized.rows_scanned;
303 let post_access_rows = materialized.post_access_rows;
304 let optimization = materialized.optimization;
305 let index_predicate_applied = materialized.index_predicate_applied;
306 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
307 let distinct_keys_deduped = materialized.distinct_keys_deduped;
308
309 Ok(Self::finalize_execution(
310 page,
311 optimization,
312 rows_scanned,
313 post_access_rows,
314 index_predicate_applied,
315 index_predicate_keys_rejected,
316 distinct_keys_deduped,
317 &mut span,
318 &mut execution_trace,
319 ))
320 })();
321
322 result.map(|page| (page, execution_trace))
323 }
324
325 fn finalize_path_outcome(
327 execution_trace: &mut Option<ExecutionTrace>,
328 optimization: Option<ExecutionOptimization>,
329 rows_scanned: usize,
330 rows_returned: usize,
331 index_predicate_applied: bool,
332 index_predicate_keys_rejected: u64,
333 distinct_keys_deduped: u64,
334 ) {
335 record_rows_scanned::<E>(rows_scanned);
336 if let Some(execution_trace) = execution_trace.as_mut() {
337 execution_trace.set_path_outcome(
338 optimization,
339 rows_scanned,
340 rows_returned,
341 index_predicate_applied,
342 index_predicate_keys_rejected,
343 distinct_keys_deduped,
344 );
345 debug_assert_eq!(
346 execution_trace.keys_scanned,
347 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
348 "execution trace keys_scanned must match rows_scanned metrics input",
349 );
350 }
351 }
352
353 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
355 plan: &AccessPlannedQuery<E::Key>,
356 cursor_boundary: Option<&CursorBoundary>,
357 ) -> Result<(), InternalError> {
358 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
359 return Ok(());
360 }
361 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
362
363 Ok(())
364 }
365}