1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod fast_stream;
6mod index_range_limit;
7mod page;
8mod pk_stream;
9mod route;
10mod secondary_index;
11mod trace;
12
13use self::{
14 execute::{ExecutionInputs, IndexPredicateCompileMode},
15 route::ExecutionRoutePlan,
16 trace::{access_path_variant, execution_order_direction},
17};
18use crate::{
19 db::{
20 Db,
21 executor::{
22 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
23 plan::{record_plan_metrics, record_rows_scanned},
24 },
25 query::plan::{
26 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
27 PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
28 derive_scan_direction, validate::validate_executor_plan,
29 },
30 response::Response,
31 },
32 error::InternalError,
33 obs::sink::{ExecKind, Span},
34 traits::{EntityKind, EntityValue},
35};
36use std::marker::PhantomData;
37
38#[derive(Debug)]
46pub(crate) struct CursorPage<E: EntityKind> {
47 pub(crate) items: Response<E>,
48
49 pub(crate) next_cursor: Option<Vec<u8>>,
50}
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub enum ExecutionAccessPathVariant {
59 ByKey,
60 ByKeys,
61 KeyRange,
62 IndexPrefix,
63 IndexRange,
64 FullScan,
65 Union,
66 Intersection,
67}
68
69#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum ExecutionOptimization {
77 PrimaryKey,
78 SecondaryOrderPushdown,
79 IndexRangeLimitPushdown,
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub struct ExecutionTrace {
91 pub access_path_variant: ExecutionAccessPathVariant,
92 pub direction: OrderDirection,
93 pub optimization: Option<ExecutionOptimization>,
94 pub keys_scanned: u64,
95 pub rows_returned: u64,
96 pub continuation_applied: bool,
97 pub index_predicate_applied: bool,
98 pub index_predicate_keys_rejected: u64,
99 pub distinct_keys_deduped: u64,
100}
101
102impl ExecutionTrace {
103 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
104 Self {
105 access_path_variant: access_path_variant(access),
106 direction: execution_order_direction(direction),
107 optimization: None,
108 keys_scanned: 0,
109 rows_returned: 0,
110 continuation_applied,
111 index_predicate_applied: false,
112 index_predicate_keys_rejected: 0,
113 distinct_keys_deduped: 0,
114 }
115 }
116
117 fn set_path_outcome(
118 &mut self,
119 optimization: Option<ExecutionOptimization>,
120 keys_scanned: usize,
121 rows_returned: usize,
122 index_predicate_applied: bool,
123 index_predicate_keys_rejected: u64,
124 distinct_keys_deduped: u64,
125 ) {
126 self.optimization = optimization;
127 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
128 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
129 self.index_predicate_applied = index_predicate_applied;
130 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
131 self.distinct_keys_deduped = distinct_keys_deduped;
132 }
133}
134
135fn key_stream_comparator_from_plan<K>(
136 plan: &LogicalPlan<K>,
137 fallback_direction: Direction,
138) -> KeyOrderComparator {
139 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
140 derive_scan_direction(order, SlotSelectionPolicy::Last)
141 });
142
143 let comparator_direction = if derived_direction == fallback_direction {
146 derived_direction
147 } else {
148 fallback_direction
149 };
150
151 KeyOrderComparator::from_direction(comparator_direction)
152}
153
154struct FastPathKeyResult {
161 ordered_key_stream: OrderedKeyStreamBox,
162 rows_scanned: usize,
163 optimization: ExecutionOptimization,
164}
165
166#[derive(Clone, Copy, Debug, Eq, PartialEq)]
174struct IndexRangeLimitSpec {
175 fetch: usize,
176}
177
178#[derive(Clone)]
186pub(crate) struct LoadExecutor<E: EntityKind> {
187 db: Db<E::Canister>,
188 debug: bool,
189 _marker: PhantomData<E>,
190}
191
192impl<E> LoadExecutor<E>
193where
194 E: EntityKind + EntityValue,
195{
196 #[must_use]
197 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
198 Self {
199 db,
200 debug,
201 _marker: PhantomData,
202 }
203 }
204
205 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
206 self.execute_paged_with_cursor(plan, PlannedCursor::none())
207 .map(|page| page.items)
208 }
209
210 pub(in crate::db) fn execute_paged_with_cursor(
211 &self,
212 plan: ExecutablePlan<E>,
213 cursor: impl Into<PlannedCursor>,
214 ) -> Result<CursorPage<E>, InternalError> {
215 self.execute_paged_with_cursor_traced(plan, cursor)
216 .map(|(page, _)| page)
217 }
218
219 #[expect(clippy::too_many_lines)]
220 pub(in crate::db) fn execute_paged_with_cursor_traced(
221 &self,
222 plan: ExecutablePlan<E>,
223 cursor: impl Into<PlannedCursor>,
224 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
225 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
226 let cursor_boundary = cursor.boundary().cloned();
227 let index_range_anchor = cursor.index_range_anchor().cloned();
228
229 if !plan.mode().is_load() {
230 return Err(InternalError::query_executor_invariant(
231 "load executor requires load plans",
232 ));
233 }
234
235 let direction = plan.direction();
236 let continuation_signature = plan.continuation_signature();
237 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
238 let index_range_specs = plan.index_range_specs()?.to_vec();
239 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
240 let mut execution_trace = self
241 .debug
242 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
243 let (plan, predicate_slots) = plan.into_parts();
244
245 let result = (|| {
246 let mut span = Span::<E>::new(ExecKind::Load);
247
248 validate_executor_plan::<E>(&plan)?;
249 let ctx = self.db.recovered_context::<E>()?;
250 let execution_inputs = ExecutionInputs {
251 ctx: &ctx,
252 plan: &plan,
253 stream_bindings: AccessStreamBindings {
254 index_prefix_specs: index_prefix_specs.as_slice(),
255 index_range_specs: index_range_specs.as_slice(),
256 index_range_anchor: index_range_anchor.as_ref(),
257 direction,
258 },
259 predicate_slots: predicate_slots.as_ref(),
260 };
261
262 record_plan_metrics(&plan.access);
263 let route_plan = Self::build_execution_route_plan_for_load(
265 &plan,
266 cursor_boundary.as_ref(),
267 index_range_anchor.as_ref(),
268 None,
269 direction,
270 )?;
271
272 let mut resolved = Self::resolve_execution_key_stream(
274 &execution_inputs,
275 &route_plan,
276 IndexPredicateCompileMode::ConservativeSubset,
277 )?;
278 let (mut page, keys_scanned, mut post_access_rows) =
279 Self::materialize_key_stream_into_page(
280 &ctx,
281 &plan,
282 predicate_slots.as_ref(),
283 resolved.key_stream.as_mut(),
284 route_plan.scan_hints.load_scan_budget_hint,
285 route_plan.streaming_access_shape_safe(),
286 cursor_boundary.as_ref(),
287 direction,
288 continuation_signature,
289 )?;
290 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
291 let mut optimization = resolved.optimization;
292 let mut index_predicate_applied = resolved.index_predicate_applied;
293 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
294 let mut distinct_keys_deduped = resolved
295 .distinct_keys_deduped_counter
296 .as_ref()
297 .map_or(0, |counter| counter.get());
298
299 if Self::index_range_limited_residual_retry_required(
300 &plan,
301 cursor_boundary.as_ref(),
302 &route_plan,
303 rows_scanned,
304 post_access_rows,
305 ) {
306 let mut fallback_route_plan = route_plan;
307 fallback_route_plan.index_range_limit_spec = None;
308 let mut fallback_resolved = Self::resolve_execution_key_stream(
309 &execution_inputs,
310 &fallback_route_plan,
311 IndexPredicateCompileMode::ConservativeSubset,
312 )?;
313 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
314 Self::materialize_key_stream_into_page(
315 &ctx,
316 &plan,
317 predicate_slots.as_ref(),
318 fallback_resolved.key_stream.as_mut(),
319 fallback_route_plan.scan_hints.load_scan_budget_hint,
320 fallback_route_plan.streaming_access_shape_safe(),
321 cursor_boundary.as_ref(),
322 direction,
323 continuation_signature,
324 )?;
325 let fallback_rows_scanned = fallback_resolved
326 .rows_scanned_override
327 .unwrap_or(fallback_keys_scanned);
328 let fallback_distinct_keys_deduped = fallback_resolved
329 .distinct_keys_deduped_counter
330 .as_ref()
331 .map_or(0, |counter| counter.get());
332
333 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
335 optimization = fallback_resolved.optimization;
336 index_predicate_applied =
337 index_predicate_applied || fallback_resolved.index_predicate_applied;
338 index_predicate_keys_rejected = index_predicate_keys_rejected
339 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
340 distinct_keys_deduped =
341 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
342 page = fallback_page;
343 post_access_rows = fallback_post_access_rows;
344 }
345
346 Ok(Self::finalize_execution(
347 page,
348 optimization,
349 rows_scanned,
350 post_access_rows,
351 index_predicate_applied,
352 index_predicate_keys_rejected,
353 distinct_keys_deduped,
354 &mut span,
355 &mut execution_trace,
356 ))
357 })();
358
359 result.map(|page| (page, execution_trace))
360 }
361
362 fn index_range_limited_residual_retry_required(
365 plan: &LogicalPlan<E::Key>,
366 cursor_boundary: Option<&CursorBoundary>,
367 route_plan: &ExecutionRoutePlan,
368 rows_scanned: usize,
369 post_access_rows: usize,
370 ) -> bool {
371 let Some(limit_spec) = route_plan.index_range_limit_spec else {
372 return false;
373 };
374 if plan.predicate.is_none() {
375 return false;
376 }
377 if limit_spec.fetch == 0 {
378 return false;
379 }
380 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
381 return false;
382 };
383 let keep_count =
384 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
385 .keep_count;
386 if keep_count == 0 {
387 return false;
388 }
389 if rows_scanned < limit_spec.fetch {
390 return false;
391 }
392
393 post_access_rows < keep_count
394 }
395
396 fn finalize_path_outcome(
398 execution_trace: &mut Option<ExecutionTrace>,
399 optimization: Option<ExecutionOptimization>,
400 rows_scanned: usize,
401 rows_returned: usize,
402 index_predicate_applied: bool,
403 index_predicate_keys_rejected: u64,
404 distinct_keys_deduped: u64,
405 ) {
406 record_rows_scanned::<E>(rows_scanned);
407 if let Some(execution_trace) = execution_trace.as_mut() {
408 execution_trace.set_path_outcome(
409 optimization,
410 rows_scanned,
411 rows_returned,
412 index_predicate_applied,
413 index_predicate_keys_rejected,
414 distinct_keys_deduped,
415 );
416 debug_assert_eq!(
417 execution_trace.keys_scanned,
418 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
419 "execution trace keys_scanned must match rows_scanned metrics input",
420 );
421 }
422 }
423
424 fn validate_pk_fast_path_boundary_if_applicable(
426 plan: &LogicalPlan<E::Key>,
427 cursor_boundary: Option<&CursorBoundary>,
428 ) -> Result<(), InternalError> {
429 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
430 return Ok(());
431 }
432 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
433
434 Ok(())
435 }
436}