1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13 execute::{ExecutionInputs, IndexPredicateCompileMode},
14 route::ExecutionRoutePlan,
15 trace::{access_path_variant, execution_order_direction},
16};
17use crate::{
18 db::{
19 Db,
20 executor::{
21 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
22 plan::{record_plan_metrics, record_rows_scanned},
23 },
24 query::plan::{
25 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
26 PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
27 derive_scan_direction, validate::validate_executor_plan,
28 },
29 response::Response,
30 },
31 error::InternalError,
32 obs::sink::{ExecKind, Span},
33 traits::{EntityKind, EntityValue},
34};
35use std::marker::PhantomData;
36
37#[derive(Debug)]
45pub(crate) struct CursorPage<E: EntityKind> {
46 pub(crate) items: Response<E>,
47
48 pub(crate) next_cursor: Option<Vec<u8>>,
49}
50
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum ExecutionAccessPathVariant {
58 ByKey,
59 ByKeys,
60 KeyRange,
61 IndexPrefix,
62 IndexRange,
63 FullScan,
64 Union,
65 Intersection,
66}
67
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
75pub enum ExecutionOptimization {
76 PrimaryKey,
77 SecondaryOrderPushdown,
78 IndexRangeLimitPushdown,
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub struct ExecutionTrace {
90 pub access_path_variant: ExecutionAccessPathVariant,
91 pub direction: OrderDirection,
92 pub optimization: Option<ExecutionOptimization>,
93 pub keys_scanned: u64,
94 pub rows_returned: u64,
95 pub continuation_applied: bool,
96 pub index_predicate_applied: bool,
97 pub index_predicate_keys_rejected: u64,
98 pub distinct_keys_deduped: u64,
99}
100
101impl ExecutionTrace {
102 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
103 Self {
104 access_path_variant: access_path_variant(access),
105 direction: execution_order_direction(direction),
106 optimization: None,
107 keys_scanned: 0,
108 rows_returned: 0,
109 continuation_applied,
110 index_predicate_applied: false,
111 index_predicate_keys_rejected: 0,
112 distinct_keys_deduped: 0,
113 }
114 }
115
116 fn set_path_outcome(
117 &mut self,
118 optimization: Option<ExecutionOptimization>,
119 keys_scanned: usize,
120 rows_returned: usize,
121 index_predicate_applied: bool,
122 index_predicate_keys_rejected: u64,
123 distinct_keys_deduped: u64,
124 ) {
125 self.optimization = optimization;
126 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
127 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
128 self.index_predicate_applied = index_predicate_applied;
129 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
130 self.distinct_keys_deduped = distinct_keys_deduped;
131 }
132}
133
134fn key_stream_comparator_from_plan<K>(
135 plan: &LogicalPlan<K>,
136 fallback_direction: Direction,
137) -> KeyOrderComparator {
138 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
139 derive_scan_direction(order, SlotSelectionPolicy::Last)
140 });
141
142 let comparator_direction = if derived_direction == fallback_direction {
145 derived_direction
146 } else {
147 fallback_direction
148 };
149
150 KeyOrderComparator::from_direction(comparator_direction)
151}
152
153struct FastPathKeyResult {
160 ordered_key_stream: OrderedKeyStreamBox,
161 rows_scanned: usize,
162 optimization: ExecutionOptimization,
163}
164
165#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173struct IndexRangeLimitSpec {
174 fetch: usize,
175}
176
177#[derive(Clone)]
185pub(crate) struct LoadExecutor<E: EntityKind> {
186 db: Db<E::Canister>,
187 debug: bool,
188 _marker: PhantomData<E>,
189}
190
191impl<E> LoadExecutor<E>
192where
193 E: EntityKind + EntityValue,
194{
195 #[must_use]
196 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
197 Self {
198 db,
199 debug,
200 _marker: PhantomData,
201 }
202 }
203
204 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
205 self.execute_paged_with_cursor(plan, PlannedCursor::none())
206 .map(|page| page.items)
207 }
208
209 pub(in crate::db) fn execute_paged_with_cursor(
210 &self,
211 plan: ExecutablePlan<E>,
212 cursor: impl Into<PlannedCursor>,
213 ) -> Result<CursorPage<E>, InternalError> {
214 self.execute_paged_with_cursor_traced(plan, cursor)
215 .map(|(page, _)| page)
216 }
217
218 #[expect(clippy::too_many_lines)]
219 pub(in crate::db) fn execute_paged_with_cursor_traced(
220 &self,
221 plan: ExecutablePlan<E>,
222 cursor: impl Into<PlannedCursor>,
223 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
224 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
225 let cursor_boundary = cursor.boundary().cloned();
226 let index_range_anchor = cursor.index_range_anchor().cloned();
227
228 if !plan.mode().is_load() {
229 return Err(InternalError::query_executor_invariant(
230 "load executor requires load plans",
231 ));
232 }
233
234 let direction = plan.direction();
235 let continuation_signature = plan.continuation_signature();
236 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
237 let index_range_specs = plan.index_range_specs()?.to_vec();
238 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
239 let mut execution_trace = self
240 .debug
241 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
242 let (plan, predicate_slots) = plan.into_parts();
243
244 let result = (|| {
245 let mut span = Span::<E>::new(ExecKind::Load);
246
247 validate_executor_plan::<E>(&plan)?;
248 let ctx = self.db.recovered_context::<E>()?;
249 let execution_inputs = ExecutionInputs {
250 ctx: &ctx,
251 plan: &plan,
252 stream_bindings: AccessStreamBindings {
253 index_prefix_specs: index_prefix_specs.as_slice(),
254 index_range_specs: index_range_specs.as_slice(),
255 index_range_anchor: index_range_anchor.as_ref(),
256 direction,
257 },
258 predicate_slots: predicate_slots.as_ref(),
259 };
260
261 record_plan_metrics(&plan.access);
262 let route_plan = Self::build_execution_route_plan_for_load(
264 &plan,
265 cursor_boundary.as_ref(),
266 index_range_anchor.as_ref(),
267 None,
268 direction,
269 )?;
270
271 let mut resolved = Self::resolve_execution_key_stream(
273 &execution_inputs,
274 &route_plan,
275 IndexPredicateCompileMode::ConservativeSubset,
276 )?;
277 let (mut page, keys_scanned, mut post_access_rows) =
278 Self::materialize_key_stream_into_page(
279 &ctx,
280 &plan,
281 predicate_slots.as_ref(),
282 resolved.key_stream.as_mut(),
283 route_plan.scan_hints.load_scan_budget_hint,
284 route_plan.streaming_access_shape_safe(),
285 cursor_boundary.as_ref(),
286 direction,
287 continuation_signature,
288 )?;
289 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
290 let mut optimization = resolved.optimization;
291 let mut index_predicate_applied = resolved.index_predicate_applied;
292 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
293 let mut distinct_keys_deduped = resolved
294 .distinct_keys_deduped_counter
295 .as_ref()
296 .map_or(0, |counter| counter.get());
297
298 if Self::index_range_limited_residual_retry_required(
299 &plan,
300 cursor_boundary.as_ref(),
301 &route_plan,
302 rows_scanned,
303 post_access_rows,
304 ) {
305 let mut fallback_route_plan = route_plan;
306 fallback_route_plan.index_range_limit_spec = None;
307 let mut fallback_resolved = Self::resolve_execution_key_stream(
308 &execution_inputs,
309 &fallback_route_plan,
310 IndexPredicateCompileMode::ConservativeSubset,
311 )?;
312 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
313 Self::materialize_key_stream_into_page(
314 &ctx,
315 &plan,
316 predicate_slots.as_ref(),
317 fallback_resolved.key_stream.as_mut(),
318 fallback_route_plan.scan_hints.load_scan_budget_hint,
319 fallback_route_plan.streaming_access_shape_safe(),
320 cursor_boundary.as_ref(),
321 direction,
322 continuation_signature,
323 )?;
324 let fallback_rows_scanned = fallback_resolved
325 .rows_scanned_override
326 .unwrap_or(fallback_keys_scanned);
327 let fallback_distinct_keys_deduped = fallback_resolved
328 .distinct_keys_deduped_counter
329 .as_ref()
330 .map_or(0, |counter| counter.get());
331
332 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
334 optimization = fallback_resolved.optimization;
335 index_predicate_applied =
336 index_predicate_applied || fallback_resolved.index_predicate_applied;
337 index_predicate_keys_rejected = index_predicate_keys_rejected
338 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
339 distinct_keys_deduped =
340 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
341 page = fallback_page;
342 post_access_rows = fallback_post_access_rows;
343 }
344
345 Ok(Self::finalize_execution(
346 page,
347 optimization,
348 rows_scanned,
349 post_access_rows,
350 index_predicate_applied,
351 index_predicate_keys_rejected,
352 distinct_keys_deduped,
353 &mut span,
354 &mut execution_trace,
355 ))
356 })();
357
358 result.map(|page| (page, execution_trace))
359 }
360
361 fn index_range_limited_residual_retry_required(
364 plan: &LogicalPlan<E::Key>,
365 cursor_boundary: Option<&CursorBoundary>,
366 route_plan: &ExecutionRoutePlan,
367 rows_scanned: usize,
368 post_access_rows: usize,
369 ) -> bool {
370 let Some(limit_spec) = route_plan.index_range_limit_spec else {
371 return false;
372 };
373 if plan.predicate.is_none() {
374 return false;
375 }
376 if limit_spec.fetch == 0 {
377 return false;
378 }
379 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
380 return false;
381 };
382 let keep_count =
383 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
384 .keep_count;
385 if keep_count == 0 {
386 return false;
387 }
388 if rows_scanned < limit_spec.fetch {
389 return false;
390 }
391
392 post_access_rows < keep_count
393 }
394
395 fn finalize_path_outcome(
397 execution_trace: &mut Option<ExecutionTrace>,
398 optimization: Option<ExecutionOptimization>,
399 rows_scanned: usize,
400 rows_returned: usize,
401 index_predicate_applied: bool,
402 index_predicate_keys_rejected: u64,
403 distinct_keys_deduped: u64,
404 ) {
405 record_rows_scanned::<E>(rows_scanned);
406 if let Some(execution_trace) = execution_trace.as_mut() {
407 execution_trace.set_path_outcome(
408 optimization,
409 rows_scanned,
410 rows_returned,
411 index_predicate_applied,
412 index_predicate_keys_rejected,
413 distinct_keys_deduped,
414 );
415 debug_assert_eq!(
416 execution_trace.keys_scanned,
417 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
418 "execution trace keys_scanned must match rows_scanned metrics input",
419 );
420 }
421 }
422
423 fn validate_pk_fast_path_boundary_if_applicable(
425 plan: &LogicalPlan<E::Key>,
426 cursor_boundary: Option<&CursorBoundary>,
427 ) -> Result<(), InternalError> {
428 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
429 return Ok(());
430 }
431 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
432
433 Ok(())
434 }
435}