1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11use self::{
12 execute::{ExecutionInputs, IndexPredicateCompileMode},
13 trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16 db::{
17 Db,
18 executor::{
19 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20 plan::{record_plan_metrics, record_rows_scanned},
21 route::ExecutionRoutePlan,
22 },
23 query::plan::{
24 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
25 PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
26 derive_scan_direction, validate::validate_executor_plan,
27 },
28 response::Response,
29 },
30 error::InternalError,
31 obs::sink::{ExecKind, Span},
32 traits::{EntityKind, EntityValue},
33};
34use std::marker::PhantomData;
35
36#[derive(Debug)]
44pub(crate) struct CursorPage<E: EntityKind> {
45 pub(crate) items: Response<E>,
46
47 pub(crate) next_cursor: Option<Vec<u8>>,
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum ExecutionAccessPathVariant {
58 ByKey,
59 ByKeys,
60 KeyRange,
61 IndexPrefix,
62 IndexRange,
63 FullScan,
64 Union,
65 Intersection,
66}
67
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
75pub enum ExecutionOptimization {
76 PrimaryKey,
77 SecondaryOrderPushdown,
78 IndexRangeLimitPushdown,
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub struct ExecutionTrace {
90 pub access_path_variant: ExecutionAccessPathVariant,
91 pub direction: OrderDirection,
92 pub optimization: Option<ExecutionOptimization>,
93 pub keys_scanned: u64,
94 pub rows_returned: u64,
95 pub continuation_applied: bool,
96 pub index_predicate_applied: bool,
97 pub index_predicate_keys_rejected: u64,
98 pub distinct_keys_deduped: u64,
99}
100
101impl ExecutionTrace {
102 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
103 Self {
104 access_path_variant: access_path_variant(access),
105 direction: execution_order_direction(direction),
106 optimization: None,
107 keys_scanned: 0,
108 rows_returned: 0,
109 continuation_applied,
110 index_predicate_applied: false,
111 index_predicate_keys_rejected: 0,
112 distinct_keys_deduped: 0,
113 }
114 }
115
116 fn set_path_outcome(
117 &mut self,
118 optimization: Option<ExecutionOptimization>,
119 keys_scanned: usize,
120 rows_returned: usize,
121 index_predicate_applied: bool,
122 index_predicate_keys_rejected: u64,
123 distinct_keys_deduped: u64,
124 ) {
125 self.optimization = optimization;
126 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
127 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
128 self.index_predicate_applied = index_predicate_applied;
129 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
130 self.distinct_keys_deduped = distinct_keys_deduped;
131 }
132}
133
134fn key_stream_comparator_from_plan<K>(
135 plan: &LogicalPlan<K>,
136 fallback_direction: Direction,
137) -> KeyOrderComparator {
138 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
139 derive_scan_direction(order, SlotSelectionPolicy::Last)
140 });
141
142 let comparator_direction = if derived_direction == fallback_direction {
145 derived_direction
146 } else {
147 fallback_direction
148 };
149
150 KeyOrderComparator::from_direction(comparator_direction)
151}
152
153struct FastPathKeyResult {
161 ordered_key_stream: OrderedKeyStreamBox,
162 rows_scanned: usize,
163 optimization: ExecutionOptimization,
164}
165
166#[derive(Clone)]
174pub(crate) struct LoadExecutor<E: EntityKind> {
175 db: Db<E::Canister>,
176 debug: bool,
177 _marker: PhantomData<E>,
178}
179
180impl<E> LoadExecutor<E>
181where
182 E: EntityKind + EntityValue,
183{
184 #[must_use]
185 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
186 Self {
187 db,
188 debug,
189 _marker: PhantomData,
190 }
191 }
192
193 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
194 self.execute_paged_with_cursor(plan, PlannedCursor::none())
195 .map(|page| page.items)
196 }
197
198 pub(in crate::db) fn execute_paged_with_cursor(
199 &self,
200 plan: ExecutablePlan<E>,
201 cursor: impl Into<PlannedCursor>,
202 ) -> Result<CursorPage<E>, InternalError> {
203 self.execute_paged_with_cursor_traced(plan, cursor)
204 .map(|(page, _)| page)
205 }
206
207 #[expect(clippy::too_many_lines)]
208 pub(in crate::db) fn execute_paged_with_cursor_traced(
209 &self,
210 plan: ExecutablePlan<E>,
211 cursor: impl Into<PlannedCursor>,
212 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
213 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
214 let cursor_boundary = cursor.boundary().cloned();
215 let index_range_anchor = cursor.index_range_anchor().cloned();
216
217 if !plan.mode().is_load() {
218 return Err(InternalError::query_executor_invariant(
219 "load executor requires load plans",
220 ));
221 }
222
223 let direction = plan.direction();
224 let continuation_signature = plan.continuation_signature();
225 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
226 let index_range_specs = plan.index_range_specs()?.to_vec();
227 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
228 let mut execution_trace = self
229 .debug
230 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
231 let (plan, predicate_slots) = plan.into_parts();
232
233 let result = (|| {
234 let mut span = Span::<E>::new(ExecKind::Load);
235
236 validate_executor_plan::<E>(&plan)?;
237 let ctx = self.db.recovered_context::<E>()?;
238 let execution_inputs = ExecutionInputs {
239 ctx: &ctx,
240 plan: &plan,
241 stream_bindings: AccessStreamBindings {
242 index_prefix_specs: index_prefix_specs.as_slice(),
243 index_range_specs: index_range_specs.as_slice(),
244 index_range_anchor: index_range_anchor.as_ref(),
245 direction,
246 },
247 predicate_slots: predicate_slots.as_ref(),
248 };
249
250 record_plan_metrics(&plan.access);
251 let route_plan = Self::build_execution_route_plan_for_load(
253 &plan,
254 cursor_boundary.as_ref(),
255 index_range_anchor.as_ref(),
256 None,
257 direction,
258 )?;
259
260 let mut resolved = Self::resolve_execution_key_stream(
262 &execution_inputs,
263 &route_plan,
264 IndexPredicateCompileMode::ConservativeSubset,
265 )?;
266 let (mut page, keys_scanned, mut post_access_rows) =
267 Self::materialize_key_stream_into_page(
268 &ctx,
269 &plan,
270 predicate_slots.as_ref(),
271 resolved.key_stream.as_mut(),
272 route_plan.scan_hints.load_scan_budget_hint,
273 route_plan.streaming_access_shape_safe(),
274 cursor_boundary.as_ref(),
275 direction,
276 continuation_signature,
277 )?;
278 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
279 let mut optimization = resolved.optimization;
280 let mut index_predicate_applied = resolved.index_predicate_applied;
281 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
282 let mut distinct_keys_deduped = resolved
283 .distinct_keys_deduped_counter
284 .as_ref()
285 .map_or(0, |counter| counter.get());
286
287 if Self::index_range_limited_residual_retry_required(
288 &plan,
289 cursor_boundary.as_ref(),
290 &route_plan,
291 rows_scanned,
292 post_access_rows,
293 ) {
294 let mut fallback_route_plan = route_plan;
295 fallback_route_plan.index_range_limit_spec = None;
296 let mut fallback_resolved = Self::resolve_execution_key_stream(
297 &execution_inputs,
298 &fallback_route_plan,
299 IndexPredicateCompileMode::ConservativeSubset,
300 )?;
301 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
302 Self::materialize_key_stream_into_page(
303 &ctx,
304 &plan,
305 predicate_slots.as_ref(),
306 fallback_resolved.key_stream.as_mut(),
307 fallback_route_plan.scan_hints.load_scan_budget_hint,
308 fallback_route_plan.streaming_access_shape_safe(),
309 cursor_boundary.as_ref(),
310 direction,
311 continuation_signature,
312 )?;
313 let fallback_rows_scanned = fallback_resolved
314 .rows_scanned_override
315 .unwrap_or(fallback_keys_scanned);
316 let fallback_distinct_keys_deduped = fallback_resolved
317 .distinct_keys_deduped_counter
318 .as_ref()
319 .map_or(0, |counter| counter.get());
320
321 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
323 optimization = fallback_resolved.optimization;
324 index_predicate_applied =
325 index_predicate_applied || fallback_resolved.index_predicate_applied;
326 index_predicate_keys_rejected = index_predicate_keys_rejected
327 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
328 distinct_keys_deduped =
329 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
330 page = fallback_page;
331 post_access_rows = fallback_post_access_rows;
332 }
333
334 Ok(Self::finalize_execution(
335 page,
336 optimization,
337 rows_scanned,
338 post_access_rows,
339 index_predicate_applied,
340 index_predicate_keys_rejected,
341 distinct_keys_deduped,
342 &mut span,
343 &mut execution_trace,
344 ))
345 })();
346
347 result.map(|page| (page, execution_trace))
348 }
349
350 fn index_range_limited_residual_retry_required(
353 plan: &LogicalPlan<E::Key>,
354 cursor_boundary: Option<&CursorBoundary>,
355 route_plan: &ExecutionRoutePlan,
356 rows_scanned: usize,
357 post_access_rows: usize,
358 ) -> bool {
359 let Some(limit_spec) = route_plan.index_range_limit_spec else {
360 return false;
361 };
362 if plan.predicate.is_none() {
363 return false;
364 }
365 if limit_spec.fetch == 0 {
366 return false;
367 }
368 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
369 return false;
370 };
371 let keep_count =
372 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
373 .keep_count;
374 if keep_count == 0 {
375 return false;
376 }
377 if rows_scanned < limit_spec.fetch {
378 return false;
379 }
380
381 post_access_rows < keep_count
382 }
383
384 fn finalize_path_outcome(
386 execution_trace: &mut Option<ExecutionTrace>,
387 optimization: Option<ExecutionOptimization>,
388 rows_scanned: usize,
389 rows_returned: usize,
390 index_predicate_applied: bool,
391 index_predicate_keys_rejected: u64,
392 distinct_keys_deduped: u64,
393 ) {
394 record_rows_scanned::<E>(rows_scanned);
395 if let Some(execution_trace) = execution_trace.as_mut() {
396 execution_trace.set_path_outcome(
397 optimization,
398 rows_scanned,
399 rows_returned,
400 index_predicate_applied,
401 index_predicate_keys_rejected,
402 distinct_keys_deduped,
403 );
404 debug_assert_eq!(
405 execution_trace.keys_scanned,
406 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
407 "execution trace keys_scanned must match rows_scanned metrics input",
408 );
409 }
410 }
411
412 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
414 plan: &LogicalPlan<E::Key>,
415 cursor_boundary: Option<&CursorBoundary>,
416 ) -> Result<(), InternalError> {
417 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
418 return Ok(());
419 }
420 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
421
422 Ok(())
423 }
424}