1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11use self::{
12 execute::{ExecutionInputs, IndexPredicateCompileMode},
13 trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16 db::{
17 Db,
18 executor::{
19 AccessStreamBindings, ExecutablePlan, KeyOrderComparator, OrderedKeyStreamBox,
20 PlannedCursor,
21 aggregate::field::{
22 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
23 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
24 },
25 compile_predicate_slots, compute_page_window, decode_pk_cursor_boundary,
26 plan::{record_plan_metrics, record_rows_scanned},
27 route::{ExecutionRoutePlan, RouteOrderSlotPolicy, derive_scan_direction},
28 },
29 query::policy,
30 query::{
31 contracts::cursor::{ContinuationToken, CursorBoundary},
32 plan::{
33 AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
34 validate::validate_executor_plan,
35 },
36 },
37 response::Response,
38 },
39 error::InternalError,
40 obs::sink::{ExecKind, Span},
41 traits::{EntityKind, EntityValue},
42};
43use std::marker::PhantomData;
44
45#[derive(Debug)]
53pub(crate) struct CursorPage<E: EntityKind> {
54 pub(crate) items: Response<E>,
55
56 pub(crate) next_cursor: Option<ContinuationToken>,
57}
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum ExecutionAccessPathVariant {
67 ByKey,
68 ByKeys,
69 KeyRange,
70 IndexPrefix,
71 IndexRange,
72 FullScan,
73 Union,
74 Intersection,
75}
76
77#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum ExecutionOptimization {
85 PrimaryKey,
86 SecondaryOrderPushdown,
87 IndexRangeLimitPushdown,
88}
89
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub struct ExecutionTrace {
99 pub access_path_variant: ExecutionAccessPathVariant,
100 pub direction: OrderDirection,
101 pub optimization: Option<ExecutionOptimization>,
102 pub keys_scanned: u64,
103 pub rows_returned: u64,
104 pub continuation_applied: bool,
105 pub index_predicate_applied: bool,
106 pub index_predicate_keys_rejected: u64,
107 pub distinct_keys_deduped: u64,
108}
109
110impl ExecutionTrace {
111 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
112 Self {
113 access_path_variant: access_path_variant(access),
114 direction: execution_order_direction(direction),
115 optimization: None,
116 keys_scanned: 0,
117 rows_returned: 0,
118 continuation_applied,
119 index_predicate_applied: false,
120 index_predicate_keys_rejected: 0,
121 distinct_keys_deduped: 0,
122 }
123 }
124
125 fn set_path_outcome(
126 &mut self,
127 optimization: Option<ExecutionOptimization>,
128 keys_scanned: usize,
129 rows_returned: usize,
130 index_predicate_applied: bool,
131 index_predicate_keys_rejected: u64,
132 distinct_keys_deduped: u64,
133 ) {
134 self.optimization = optimization;
135 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
136 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
137 self.index_predicate_applied = index_predicate_applied;
138 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
139 self.distinct_keys_deduped = distinct_keys_deduped;
140 }
141}
142
143fn key_stream_comparator_from_plan<K>(
144 plan: &AccessPlannedQuery<K>,
145 fallback_direction: Direction,
146) -> KeyOrderComparator {
147 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
148 derive_scan_direction(order, RouteOrderSlotPolicy::Last)
149 });
150
151 let comparator_direction = if derived_direction == fallback_direction {
154 derived_direction
155 } else {
156 fallback_direction
157 };
158
159 KeyOrderComparator::from_direction(comparator_direction)
160}
161
162struct FastPathKeyResult {
170 ordered_key_stream: OrderedKeyStreamBox,
171 rows_scanned: usize,
172 optimization: ExecutionOptimization,
173}
174
175#[derive(Clone)]
183pub(crate) struct LoadExecutor<E: EntityKind> {
184 db: Db<E::Canister>,
185 debug: bool,
186 _marker: PhantomData<E>,
187}
188
189impl<E> LoadExecutor<E>
190where
191 E: EntityKind + EntityValue,
192{
193 #[must_use]
194 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
195 Self {
196 db,
197 debug,
198 _marker: PhantomData,
199 }
200 }
201
202 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
205 resolve_orderable_aggregate_target_slot::<E>(target_field)
206 .map_err(AggregateFieldValueError::into_internal_error)
207 }
208
209 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
212 resolve_any_aggregate_target_slot::<E>(target_field)
213 .map_err(AggregateFieldValueError::into_internal_error)
214 }
215
216 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
219 resolve_numeric_aggregate_target_slot::<E>(target_field)
220 .map_err(AggregateFieldValueError::into_internal_error)
221 }
222
223 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
224 self.execute_paged_with_cursor(plan, PlannedCursor::none())
225 .map(|page| page.items)
226 }
227
228 pub(in crate::db) fn execute_paged_with_cursor(
229 &self,
230 plan: ExecutablePlan<E>,
231 cursor: impl Into<PlannedCursor>,
232 ) -> Result<CursorPage<E>, InternalError> {
233 self.execute_paged_with_cursor_traced(plan, cursor)
234 .map(|(page, _)| page)
235 }
236
237 #[expect(clippy::too_many_lines)]
238 pub(in crate::db) fn execute_paged_with_cursor_traced(
239 &self,
240 plan: ExecutablePlan<E>,
241 cursor: impl Into<PlannedCursor>,
242 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
243 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
244 let cursor_boundary = cursor.boundary().cloned();
245 let index_range_anchor = cursor
246 .index_range_anchor()
247 .map(|anchor| anchor.last_raw_key().clone());
248
249 if !plan.mode().is_load() {
250 return Err(InternalError::query_executor_invariant(
251 "load executor requires load plans",
252 ));
253 }
254 debug_assert!(
255 policy::validate_plan_shape(plan.as_inner()).is_ok(),
256 "load executor received a plan shape that bypassed planning validation",
257 );
258
259 let continuation_signature = plan.continuation_signature();
260 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
261 let index_range_specs = plan.index_range_specs()?.to_vec();
262 let route_plan = Self::build_execution_route_plan_for_load(
263 plan.as_inner(),
264 cursor_boundary.as_ref(),
265 index_range_anchor.as_ref(),
266 None,
267 )?;
268 let continuation_applied = !matches!(
269 route_plan.continuation_mode(),
270 crate::db::executor::route::ContinuationMode::Initial
271 );
272 let direction = route_plan.direction();
273 debug_assert_eq!(
274 route_plan.window().effective_offset,
275 plan.as_inner()
276 .effective_page_offset(cursor_boundary.as_ref()),
277 "route window effective offset must match logical plan offset semantics",
278 );
279 let mut execution_trace = self
280 .debug
281 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
282 let plan = plan.into_inner();
283 let predicate_slots = compile_predicate_slots::<E>(&plan);
284
285 let result = (|| {
286 let mut span = Span::<E>::new(ExecKind::Load);
287
288 validate_executor_plan::<E>(&plan)?;
289 let ctx = self.db.recovered_context::<E>()?;
290 let execution_inputs = ExecutionInputs {
291 ctx: &ctx,
292 plan: &plan,
293 stream_bindings: AccessStreamBindings {
294 index_prefix_specs: index_prefix_specs.as_slice(),
295 index_range_specs: index_range_specs.as_slice(),
296 index_range_anchor: index_range_anchor.as_ref(),
297 direction,
298 },
299 predicate_slots: predicate_slots.as_ref(),
300 };
301
302 record_plan_metrics(&plan.access);
303 let mut resolved = Self::resolve_execution_key_stream(
306 &execution_inputs,
307 &route_plan,
308 IndexPredicateCompileMode::ConservativeSubset,
309 )?;
310 let (mut page, keys_scanned, mut post_access_rows) =
311 Self::materialize_key_stream_into_page(
312 &ctx,
313 &plan,
314 predicate_slots.as_ref(),
315 resolved.key_stream.as_mut(),
316 route_plan.scan_hints.load_scan_budget_hint,
317 route_plan.streaming_access_shape_safe(),
318 cursor_boundary.as_ref(),
319 direction,
320 continuation_signature,
321 )?;
322 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
323 let mut optimization = resolved.optimization;
324 let mut index_predicate_applied = resolved.index_predicate_applied;
325 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
326 let mut distinct_keys_deduped = resolved
327 .distinct_keys_deduped_counter
328 .as_ref()
329 .map_or(0, |counter| counter.get());
330
331 if Self::index_range_limited_residual_retry_required(
332 &plan,
333 cursor_boundary.as_ref(),
334 &route_plan,
335 rows_scanned,
336 post_access_rows,
337 ) {
338 let mut fallback_route_plan = route_plan;
339 fallback_route_plan.index_range_limit_spec = None;
340 let mut fallback_resolved = Self::resolve_execution_key_stream(
341 &execution_inputs,
342 &fallback_route_plan,
343 IndexPredicateCompileMode::ConservativeSubset,
344 )?;
345 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
346 Self::materialize_key_stream_into_page(
347 &ctx,
348 &plan,
349 predicate_slots.as_ref(),
350 fallback_resolved.key_stream.as_mut(),
351 fallback_route_plan.scan_hints.load_scan_budget_hint,
352 fallback_route_plan.streaming_access_shape_safe(),
353 cursor_boundary.as_ref(),
354 direction,
355 continuation_signature,
356 )?;
357 let fallback_rows_scanned = fallback_resolved
358 .rows_scanned_override
359 .unwrap_or(fallback_keys_scanned);
360 let fallback_distinct_keys_deduped = fallback_resolved
361 .distinct_keys_deduped_counter
362 .as_ref()
363 .map_or(0, |counter| counter.get());
364
365 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
367 optimization = fallback_resolved.optimization;
368 index_predicate_applied =
369 index_predicate_applied || fallback_resolved.index_predicate_applied;
370 index_predicate_keys_rejected = index_predicate_keys_rejected
371 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
372 distinct_keys_deduped =
373 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
374 page = fallback_page;
375 post_access_rows = fallback_post_access_rows;
376 }
377
378 Ok(Self::finalize_execution(
379 page,
380 optimization,
381 rows_scanned,
382 post_access_rows,
383 index_predicate_applied,
384 index_predicate_keys_rejected,
385 distinct_keys_deduped,
386 &mut span,
387 &mut execution_trace,
388 ))
389 })();
390
391 result.map(|page| (page, execution_trace))
392 }
393
394 fn index_range_limited_residual_retry_required(
397 plan: &AccessPlannedQuery<E::Key>,
398 cursor_boundary: Option<&CursorBoundary>,
399 route_plan: &ExecutionRoutePlan,
400 rows_scanned: usize,
401 post_access_rows: usize,
402 ) -> bool {
403 let Some(limit_spec) = route_plan.index_range_limit_spec else {
404 return false;
405 };
406 if plan.predicate.is_none() {
407 return false;
408 }
409 if limit_spec.fetch == 0 {
410 return false;
411 }
412 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
413 return false;
414 };
415 let keep_count =
416 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
417 .keep_count;
418 if keep_count == 0 {
419 return false;
420 }
421 if rows_scanned < limit_spec.fetch {
422 return false;
423 }
424
425 post_access_rows < keep_count
426 }
427
428 fn finalize_path_outcome(
430 execution_trace: &mut Option<ExecutionTrace>,
431 optimization: Option<ExecutionOptimization>,
432 rows_scanned: usize,
433 rows_returned: usize,
434 index_predicate_applied: bool,
435 index_predicate_keys_rejected: u64,
436 distinct_keys_deduped: u64,
437 ) {
438 record_rows_scanned::<E>(rows_scanned);
439 if let Some(execution_trace) = execution_trace.as_mut() {
440 execution_trace.set_path_outcome(
441 optimization,
442 rows_scanned,
443 rows_returned,
444 index_predicate_applied,
445 index_predicate_keys_rejected,
446 distinct_keys_deduped,
447 );
448 debug_assert_eq!(
449 execution_trace.keys_scanned,
450 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
451 "execution trace keys_scanned must match rows_scanned metrics input",
452 );
453 }
454 }
455
456 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
458 plan: &AccessPlannedQuery<E::Key>,
459 cursor_boundary: Option<&CursorBoundary>,
460 ) -> Result<(), InternalError> {
461 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
462 return Ok(());
463 }
464 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
465
466 Ok(())
467 }
468}