1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11use self::{
12 execute::{ExecutionInputs, IndexPredicateCompileMode},
13 trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16 db::{
17 Db,
18 executor::{
19 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20 aggregate::field::{
21 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
22 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
23 },
24 plan::{record_plan_metrics, record_rows_scanned},
25 route::ExecutionRoutePlan,
26 },
27 query::plan::{
28 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
29 PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
30 derive_scan_direction, validate::validate_executor_plan,
31 },
32 response::Response,
33 },
34 error::InternalError,
35 obs::sink::{ExecKind, Span},
36 traits::{EntityKind, EntityValue},
37};
38use std::marker::PhantomData;
39
40#[derive(Debug)]
48pub(crate) struct CursorPage<E: EntityKind> {
49 pub(crate) items: Response<E>,
50
51 pub(crate) next_cursor: Option<Vec<u8>>,
52}
53
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub enum ExecutionAccessPathVariant {
62 ByKey,
63 ByKeys,
64 KeyRange,
65 IndexPrefix,
66 IndexRange,
67 FullScan,
68 Union,
69 Intersection,
70}
71
72#[derive(Clone, Copy, Debug, Eq, PartialEq)]
79pub enum ExecutionOptimization {
80 PrimaryKey,
81 SecondaryOrderPushdown,
82 IndexRangeLimitPushdown,
83}
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
93pub struct ExecutionTrace {
94 pub access_path_variant: ExecutionAccessPathVariant,
95 pub direction: OrderDirection,
96 pub optimization: Option<ExecutionOptimization>,
97 pub keys_scanned: u64,
98 pub rows_returned: u64,
99 pub continuation_applied: bool,
100 pub index_predicate_applied: bool,
101 pub index_predicate_keys_rejected: u64,
102 pub distinct_keys_deduped: u64,
103}
104
105impl ExecutionTrace {
106 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107 Self {
108 access_path_variant: access_path_variant(access),
109 direction: execution_order_direction(direction),
110 optimization: None,
111 keys_scanned: 0,
112 rows_returned: 0,
113 continuation_applied,
114 index_predicate_applied: false,
115 index_predicate_keys_rejected: 0,
116 distinct_keys_deduped: 0,
117 }
118 }
119
120 fn set_path_outcome(
121 &mut self,
122 optimization: Option<ExecutionOptimization>,
123 keys_scanned: usize,
124 rows_returned: usize,
125 index_predicate_applied: bool,
126 index_predicate_keys_rejected: u64,
127 distinct_keys_deduped: u64,
128 ) {
129 self.optimization = optimization;
130 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
131 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
132 self.index_predicate_applied = index_predicate_applied;
133 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
134 self.distinct_keys_deduped = distinct_keys_deduped;
135 }
136}
137
138fn key_stream_comparator_from_plan<K>(
139 plan: &LogicalPlan<K>,
140 fallback_direction: Direction,
141) -> KeyOrderComparator {
142 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
143 derive_scan_direction(order, SlotSelectionPolicy::Last)
144 });
145
146 let comparator_direction = if derived_direction == fallback_direction {
149 derived_direction
150 } else {
151 fallback_direction
152 };
153
154 KeyOrderComparator::from_direction(comparator_direction)
155}
156
157struct FastPathKeyResult {
165 ordered_key_stream: OrderedKeyStreamBox,
166 rows_scanned: usize,
167 optimization: ExecutionOptimization,
168}
169
170#[derive(Clone)]
178pub(crate) struct LoadExecutor<E: EntityKind> {
179 db: Db<E::Canister>,
180 debug: bool,
181 _marker: PhantomData<E>,
182}
183
184impl<E> LoadExecutor<E>
185where
186 E: EntityKind + EntityValue,
187{
188 #[must_use]
189 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
190 Self {
191 db,
192 debug,
193 _marker: PhantomData,
194 }
195 }
196
197 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
200 resolve_orderable_aggregate_target_slot::<E>(target_field)
201 .map_err(AggregateFieldValueError::into_internal_error)
202 }
203
204 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
207 resolve_any_aggregate_target_slot::<E>(target_field)
208 .map_err(AggregateFieldValueError::into_internal_error)
209 }
210
211 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
214 resolve_numeric_aggregate_target_slot::<E>(target_field)
215 .map_err(AggregateFieldValueError::into_internal_error)
216 }
217
218 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
219 self.execute_paged_with_cursor(plan, PlannedCursor::none())
220 .map(|page| page.items)
221 }
222
223 pub(in crate::db) fn execute_paged_with_cursor(
224 &self,
225 plan: ExecutablePlan<E>,
226 cursor: impl Into<PlannedCursor>,
227 ) -> Result<CursorPage<E>, InternalError> {
228 self.execute_paged_with_cursor_traced(plan, cursor)
229 .map(|(page, _)| page)
230 }
231
232 #[expect(clippy::too_many_lines)]
233 pub(in crate::db) fn execute_paged_with_cursor_traced(
234 &self,
235 plan: ExecutablePlan<E>,
236 cursor: impl Into<PlannedCursor>,
237 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
238 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
239 let cursor_boundary = cursor.boundary().cloned();
240 let index_range_anchor = cursor.index_range_anchor().cloned();
241
242 if !plan.mode().is_load() {
243 return Err(InternalError::query_executor_invariant(
244 "load executor requires load plans",
245 ));
246 }
247
248 let direction = plan.direction();
249 let continuation_signature = plan.continuation_signature();
250 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
251 let index_range_specs = plan.index_range_specs()?.to_vec();
252 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
253 let mut execution_trace = self
254 .debug
255 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
256 let (plan, predicate_slots) = plan.into_parts();
257
258 let result = (|| {
259 let mut span = Span::<E>::new(ExecKind::Load);
260
261 validate_executor_plan::<E>(&plan)?;
262 let ctx = self.db.recovered_context::<E>()?;
263 let execution_inputs = ExecutionInputs {
264 ctx: &ctx,
265 plan: &plan,
266 stream_bindings: AccessStreamBindings {
267 index_prefix_specs: index_prefix_specs.as_slice(),
268 index_range_specs: index_range_specs.as_slice(),
269 index_range_anchor: index_range_anchor.as_ref(),
270 direction,
271 },
272 predicate_slots: predicate_slots.as_ref(),
273 };
274
275 record_plan_metrics(&plan.access);
276 let route_plan = Self::build_execution_route_plan_for_load(
278 &plan,
279 cursor_boundary.as_ref(),
280 index_range_anchor.as_ref(),
281 None,
282 direction,
283 )?;
284
285 let mut resolved = Self::resolve_execution_key_stream(
287 &execution_inputs,
288 &route_plan,
289 IndexPredicateCompileMode::ConservativeSubset,
290 )?;
291 let (mut page, keys_scanned, mut post_access_rows) =
292 Self::materialize_key_stream_into_page(
293 &ctx,
294 &plan,
295 predicate_slots.as_ref(),
296 resolved.key_stream.as_mut(),
297 route_plan.scan_hints.load_scan_budget_hint,
298 route_plan.streaming_access_shape_safe(),
299 cursor_boundary.as_ref(),
300 direction,
301 continuation_signature,
302 )?;
303 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
304 let mut optimization = resolved.optimization;
305 let mut index_predicate_applied = resolved.index_predicate_applied;
306 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
307 let mut distinct_keys_deduped = resolved
308 .distinct_keys_deduped_counter
309 .as_ref()
310 .map_or(0, |counter| counter.get());
311
312 if Self::index_range_limited_residual_retry_required(
313 &plan,
314 cursor_boundary.as_ref(),
315 &route_plan,
316 rows_scanned,
317 post_access_rows,
318 ) {
319 let mut fallback_route_plan = route_plan;
320 fallback_route_plan.index_range_limit_spec = None;
321 let mut fallback_resolved = Self::resolve_execution_key_stream(
322 &execution_inputs,
323 &fallback_route_plan,
324 IndexPredicateCompileMode::ConservativeSubset,
325 )?;
326 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
327 Self::materialize_key_stream_into_page(
328 &ctx,
329 &plan,
330 predicate_slots.as_ref(),
331 fallback_resolved.key_stream.as_mut(),
332 fallback_route_plan.scan_hints.load_scan_budget_hint,
333 fallback_route_plan.streaming_access_shape_safe(),
334 cursor_boundary.as_ref(),
335 direction,
336 continuation_signature,
337 )?;
338 let fallback_rows_scanned = fallback_resolved
339 .rows_scanned_override
340 .unwrap_or(fallback_keys_scanned);
341 let fallback_distinct_keys_deduped = fallback_resolved
342 .distinct_keys_deduped_counter
343 .as_ref()
344 .map_or(0, |counter| counter.get());
345
346 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
348 optimization = fallback_resolved.optimization;
349 index_predicate_applied =
350 index_predicate_applied || fallback_resolved.index_predicate_applied;
351 index_predicate_keys_rejected = index_predicate_keys_rejected
352 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
353 distinct_keys_deduped =
354 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
355 page = fallback_page;
356 post_access_rows = fallback_post_access_rows;
357 }
358
359 Ok(Self::finalize_execution(
360 page,
361 optimization,
362 rows_scanned,
363 post_access_rows,
364 index_predicate_applied,
365 index_predicate_keys_rejected,
366 distinct_keys_deduped,
367 &mut span,
368 &mut execution_trace,
369 ))
370 })();
371
372 result.map(|page| (page, execution_trace))
373 }
374
375 fn index_range_limited_residual_retry_required(
378 plan: &LogicalPlan<E::Key>,
379 cursor_boundary: Option<&CursorBoundary>,
380 route_plan: &ExecutionRoutePlan,
381 rows_scanned: usize,
382 post_access_rows: usize,
383 ) -> bool {
384 let Some(limit_spec) = route_plan.index_range_limit_spec else {
385 return false;
386 };
387 if plan.predicate.is_none() {
388 return false;
389 }
390 if limit_spec.fetch == 0 {
391 return false;
392 }
393 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
394 return false;
395 };
396 let keep_count =
397 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
398 .keep_count;
399 if keep_count == 0 {
400 return false;
401 }
402 if rows_scanned < limit_spec.fetch {
403 return false;
404 }
405
406 post_access_rows < keep_count
407 }
408
409 fn finalize_path_outcome(
411 execution_trace: &mut Option<ExecutionTrace>,
412 optimization: Option<ExecutionOptimization>,
413 rows_scanned: usize,
414 rows_returned: usize,
415 index_predicate_applied: bool,
416 index_predicate_keys_rejected: u64,
417 distinct_keys_deduped: u64,
418 ) {
419 record_rows_scanned::<E>(rows_scanned);
420 if let Some(execution_trace) = execution_trace.as_mut() {
421 execution_trace.set_path_outcome(
422 optimization,
423 rows_scanned,
424 rows_returned,
425 index_predicate_applied,
426 index_predicate_keys_rejected,
427 distinct_keys_deduped,
428 );
429 debug_assert_eq!(
430 execution_trace.keys_scanned,
431 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
432 "execution trace keys_scanned must match rows_scanned metrics input",
433 );
434 }
435 }
436
437 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
439 plan: &LogicalPlan<E::Key>,
440 cursor_boundary: Option<&CursorBoundary>,
441 ) -> Result<(), InternalError> {
442 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
443 return Ok(());
444 }
445 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
446
447 Ok(())
448 }
449}