1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6 db::{
7 Context, Db,
8 data::DataKey,
9 executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
10 index::{IndexKey, RawIndexKey},
11 query::plan::{
12 AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13 CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14 OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15 logical::PostAccessStats,
16 project_access_plan,
17 validate::{
18 PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19 validate_executor_plan,
20 },
21 },
22 response::Response,
23 },
24 error::{ErrorClass, ErrorOrigin, InternalError},
25 obs::sink::{ExecKind, Span},
26 traits::{EntityKind, EntityValue},
27 types::Id,
28 value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41 pub(crate) items: Response<E>,
42
43 pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53 ByKey,
54 ByKeys,
55 KeyRange,
56 IndexPrefix,
57 IndexRange,
58 FullScan,
59 Union,
60 Intersection,
61}
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70 SecondaryOrder,
71 IndexRangeLimit,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81 PrimaryKey,
82 SecondaryIndex,
83 IndexRange,
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95 pub access_path_variant: ExecutionAccessPathVariant,
96 pub direction: OrderDirection,
97 pub pushdown_used: bool,
98 pub pushdown_type: Option<ExecutionPushdownType>,
99 pub fast_path_used: Option<ExecutionFastPath>,
100 pub keys_scanned: u64,
101 pub rows_returned: u64,
102 pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107 Self {
108 access_path_variant: access_path_variant(access),
109 direction: execution_order_direction(direction),
110 pushdown_used: false,
111 pushdown_type: None,
112 fast_path_used: None,
113 keys_scanned: 0,
114 rows_returned: 0,
115 continuation_applied,
116 }
117 }
118
119 fn set_path_outcome(
120 &mut self,
121 fast_path_used: Option<ExecutionFastPath>,
122 pushdown_type: Option<ExecutionPushdownType>,
123 keys_scanned: usize,
124 rows_returned: usize,
125 ) {
126 self.fast_path_used = fast_path_used;
127 self.pushdown_type = pushdown_type;
128 self.pushdown_used = pushdown_type.is_some();
129 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131 }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137 type Output = ExecutionAccessPathVariant;
138
139 fn by_key(&mut self, _key: &K) -> Self::Output {
140 ExecutionAccessPathVariant::ByKey
141 }
142
143 fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144 ExecutionAccessPathVariant::ByKeys
145 }
146
147 fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148 ExecutionAccessPathVariant::KeyRange
149 }
150
151 fn index_prefix(
152 &mut self,
153 _index_name: &'static str,
154 _index_fields: &[&'static str],
155 _prefix_len: usize,
156 _values: &[Value],
157 ) -> Self::Output {
158 ExecutionAccessPathVariant::IndexPrefix
159 }
160
161 fn index_range(
162 &mut self,
163 _index_name: &'static str,
164 _index_fields: &[&'static str],
165 _prefix_len: usize,
166 _prefix: &[Value],
167 _lower: &Bound<Value>,
168 _upper: &Bound<Value>,
169 ) -> Self::Output {
170 ExecutionAccessPathVariant::IndexRange
171 }
172
173 fn full_scan(&mut self) -> Self::Output {
174 ExecutionAccessPathVariant::FullScan
175 }
176
177 fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178 ExecutionAccessPathVariant::Union
179 }
180
181 fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182 ExecutionAccessPathVariant::Intersection
183 }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187 let mut projection = ExecutionAccessProjection;
188 project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192 match direction {
193 Direction::Asc => OrderDirection::Asc,
194 Direction::Desc => OrderDirection::Desc,
195 }
196}
197
198struct FastPathKeyResult {
205 ordered_keys: Vec<DataKey>,
206 rows_scanned: usize,
207 fast_path_used: ExecutionFastPath,
208 pushdown_type: Option<ExecutionPushdownType>,
209}
210
211#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220 db: Db<E::Canister>,
221 debug: bool,
222 _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227 E: EntityKind + EntityValue,
228{
229 #[must_use]
230 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231 Self {
232 db,
233 debug,
234 _marker: PhantomData,
235 }
236 }
237
238 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239 self.execute_paged_with_cursor(plan, PlannedCursor::none())
240 .map(|page| page.items)
241 }
242
243 pub(in crate::db) fn execute_paged_with_cursor(
244 &self,
245 plan: ExecutablePlan<E>,
246 cursor: impl Into<PlannedCursor>,
247 ) -> Result<CursorPage<E>, InternalError> {
248 self.execute_paged_with_cursor_traced(plan, cursor)
249 .map(|(page, _)| page)
250 }
251
252 pub(in crate::db) fn execute_paged_with_cursor_traced(
253 &self,
254 plan: ExecutablePlan<E>,
255 cursor: impl Into<PlannedCursor>,
256 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258 let cursor_boundary = cursor.boundary().cloned();
259 let index_range_anchor = cursor.index_range_anchor().cloned();
260
261 if !plan.mode().is_load() {
262 return Err(InternalError::new(
263 ErrorClass::InvariantViolation,
264 ErrorOrigin::Query,
265 "executor invariant violated: load executor requires load plans",
266 ));
267 }
268
269 let direction = plan.direction();
270 let continuation_signature = plan.continuation_signature();
271 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
272 let mut execution_trace = self
273 .debug
274 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
275
276 let result = (|| {
277 let mut span = Span::<E>::new(ExecKind::Load);
278 let plan = plan.into_inner();
279
280 validate_executor_plan::<E>(&plan)?;
281 let ctx = self.db.recovered_context::<E>()?;
282
283 record_plan_metrics(&plan.access);
284 let secondary_pushdown_applicability =
287 Self::assess_secondary_order_pushdown_applicability(&plan);
288
289 if let Some(page) = Self::try_execute_fast_paths(
290 &ctx,
291 &plan,
292 &secondary_pushdown_applicability,
293 cursor_boundary.as_ref(),
294 index_range_anchor.as_ref(),
295 direction,
296 continuation_signature,
297 &mut span,
298 &mut execution_trace,
299 )? {
300 return Ok(page);
301 }
302
303 let data_rows = ctx.rows_from_access_plan_with_index_range_anchor(
304 &plan.access,
305 plan.consistency,
306 index_range_anchor.as_ref(),
307 direction,
308 )?;
309 let keys_scanned = data_rows.len();
310
311 let mut rows = Context::deserialize_rows(data_rows)?;
312 let page = Self::finalize_rows_into_page(
313 &plan,
314 &mut rows,
315 cursor_boundary.as_ref(),
316 direction,
317 continuation_signature,
318 )?;
319 let post_access_rows = page.items.0.len();
320 Self::finalize_path_outcome(
321 &mut execution_trace,
322 None,
323 None,
324 keys_scanned,
325 post_access_rows,
326 );
327
328 set_rows_from_len(&mut span, page.items.0.len());
329 Ok(page)
330 })();
331
332 result.map(|page| (page, execution_trace))
333 }
334
335 fn finalize_path_outcome(
337 execution_trace: &mut Option<ExecutionTrace>,
338 fast_path_used: Option<ExecutionFastPath>,
339 pushdown_type: Option<ExecutionPushdownType>,
340 rows_scanned: usize,
341 rows_returned: usize,
342 ) {
343 record_rows_scanned::<E>(rows_scanned);
344 if let Some(execution_trace) = execution_trace.as_mut() {
345 execution_trace.set_path_outcome(
346 fast_path_used,
347 pushdown_type,
348 rows_scanned,
349 rows_returned,
350 );
351 debug_assert_eq!(
352 execution_trace.keys_scanned,
353 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
354 "execution trace keys_scanned must match rows_scanned metrics input",
355 );
356 }
357 }
358
359 fn materialize_keys_into_page(
361 ctx: &Context<'_, E>,
362 plan: &LogicalPlan<E::Key>,
363 ordered_keys: &[DataKey],
364 cursor_boundary: Option<&CursorBoundary>,
365 direction: Direction,
366 continuation_signature: ContinuationSignature,
367 ) -> Result<(CursorPage<E>, usize), InternalError> {
368 let data_rows = ctx.rows_from_ordered_data_keys(ordered_keys, plan.consistency)?;
369 let mut rows = Context::deserialize_rows(data_rows)?;
370 let page = Self::finalize_rows_into_page(
371 plan,
372 &mut rows,
373 cursor_boundary,
374 direction,
375 continuation_signature,
376 )?;
377 let post_access_rows = page.items.0.len();
378
379 Ok((page, post_access_rows))
380 }
381
382 fn validate_pk_fast_path_boundary_if_applicable(
384 plan: &LogicalPlan<E::Key>,
385 cursor_boundary: Option<&CursorBoundary>,
386 ) -> Result<(), InternalError> {
387 if !Self::is_pk_order_stream_eligible(plan) {
388 return Ok(());
389 }
390 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
391
392 Ok(())
393 }
394
395 fn index_range_limit_pushdown_fetch(
396 plan: &LogicalPlan<E::Key>,
397 cursor_boundary: Option<&CursorBoundary>,
398 index_range_anchor: Option<&RawIndexKey>,
399 ) -> Option<usize> {
400 if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
401 return None;
402 }
403 if cursor_boundary.is_some() && index_range_anchor.is_none() {
404 return None;
405 }
406
407 let page = plan.page.as_ref()?;
408 let limit = page.limit?;
409 if limit == 0 {
410 return Some(0);
411 }
412
413 let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
414 let limit = usize::try_from(limit).unwrap_or(usize::MAX);
415 let page_end = offset.saturating_add(limit);
416 let needs_extra_row = true;
417
418 Some(page_end.saturating_add(usize::from(needs_extra_row)))
419 }
420
421 #[expect(
423 clippy::too_many_arguments,
424 reason = "fast-path dispatch keeps execution inputs explicit at one call site"
425 )]
426 fn try_execute_fast_paths(
427 ctx: &Context<'_, E>,
428 plan: &LogicalPlan<E::Key>,
429 secondary_pushdown_applicability: &PushdownApplicability,
430 cursor_boundary: Option<&CursorBoundary>,
431 index_range_anchor: Option<&RawIndexKey>,
432 direction: Direction,
433 continuation_signature: ContinuationSignature,
434 span: &mut Span<E>,
435 execution_trace: &mut Option<ExecutionTrace>,
436 ) -> Result<Option<CursorPage<E>>, InternalError> {
437 Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
438
439 if let Some(fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
440 let (page, post_access_rows) = Self::materialize_keys_into_page(
441 ctx,
442 plan,
443 &fast.ordered_keys,
444 cursor_boundary,
445 direction,
446 continuation_signature,
447 )?;
448 Self::finalize_path_outcome(
449 execution_trace,
450 Some(fast.fast_path_used),
451 fast.pushdown_type,
452 fast.rows_scanned,
453 post_access_rows,
454 );
455 set_rows_from_len(span, page.items.0.len());
456 return Ok(Some(page));
457 }
458
459 if let Some(fast) = Self::try_execute_secondary_index_order_stream(
460 ctx,
461 plan,
462 secondary_pushdown_applicability,
463 )? {
464 let (page, post_access_rows) = Self::materialize_keys_into_page(
465 ctx,
466 plan,
467 &fast.ordered_keys,
468 cursor_boundary,
469 direction,
470 continuation_signature,
471 )?;
472 Self::finalize_path_outcome(
473 execution_trace,
474 Some(fast.fast_path_used),
475 fast.pushdown_type,
476 fast.rows_scanned,
477 post_access_rows,
478 );
479 set_rows_from_len(span, page.items.0.len());
480 return Ok(Some(page));
481 }
482
483 let index_range_limit_fetch =
484 Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
485 if let Some(fast) = Self::try_execute_index_range_limit_pushdown_stream(
486 ctx,
487 plan,
488 index_range_anchor,
489 direction,
490 index_range_limit_fetch,
491 )? {
492 let (page, post_access_rows) = Self::materialize_keys_into_page(
493 ctx,
494 plan,
495 &fast.ordered_keys,
496 cursor_boundary,
497 direction,
498 continuation_signature,
499 )?;
500 Self::finalize_path_outcome(
501 execution_trace,
502 Some(fast.fast_path_used),
503 fast.pushdown_type,
504 fast.rows_scanned,
505 post_access_rows,
506 );
507 set_rows_from_len(span, page.items.0.len());
508 return Ok(Some(page));
509 }
510
511 Ok(None)
512 }
513
514 fn finalize_rows_into_page(
516 plan: &LogicalPlan<E::Key>,
517 rows: &mut Vec<(Id<E>, E)>,
518 cursor_boundary: Option<&CursorBoundary>,
519 direction: Direction,
520 continuation_signature: ContinuationSignature,
521 ) -> Result<CursorPage<E>, InternalError> {
522 let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
523 let next_cursor =
524 Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
525 let items = Response(std::mem::take(rows));
526
527 Ok(CursorPage { items, next_cursor })
528 }
529
530 fn assess_secondary_order_pushdown_applicability(
533 plan: &LogicalPlan<E::Key>,
534 ) -> PushdownApplicability {
535 assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
536 }
537
538 fn build_next_cursor(
539 plan: &LogicalPlan<E::Key>,
540 rows: &[(Id<E>, E)],
541 stats: &PostAccessStats,
542 direction: Direction,
543 signature: ContinuationSignature,
544 ) -> Result<Option<Vec<u8>>, InternalError> {
545 let Some(page) = plan.page.as_ref() else {
546 return Ok(None);
547 };
548 let Some(limit) = page.limit else {
549 return Ok(None);
550 };
551 if rows.is_empty() {
552 return Ok(None);
553 }
554
555 let page_end = (page.offset as usize).saturating_add(limit as usize);
557 if stats.rows_after_cursor <= page_end {
558 return Ok(None);
559 }
560
561 let Some((_, last_entity)) = rows.last() else {
562 return Ok(None);
563 };
564 Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
565 }
566
567 fn encode_next_cursor_for_last_entity(
569 plan: &LogicalPlan<E::Key>,
570 last_entity: &E,
571 direction: Direction,
572 signature: ContinuationSignature,
573 ) -> Result<Vec<u8>, InternalError> {
574 let boundary = plan.cursor_boundary_from_entity(last_entity)?;
575 let token = if plan.access.cursor_support().supports_index_range_anchor() {
576 let (index, _, _, _) =
577 plan.access.as_index_range_path().ok_or_else(|| {
578 InternalError::new(
579 ErrorClass::InvariantViolation,
580 ErrorOrigin::Query,
581 "executor invariant violated: index-range cursor support missing concrete index-range path",
582 )
583 })?;
584 let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
585 InternalError::new(
586 ErrorClass::InvariantViolation,
587 ErrorOrigin::Query,
588 "executor invariant violated: cursor row is not indexable for planned index-range access",
589 )
590 })?;
591
592 ContinuationToken::new_index_range_with_direction(
593 signature,
594 boundary,
595 IndexRangeCursorAnchor::new(index_key.to_raw()),
596 direction,
597 )
598 } else {
599 ContinuationToken::new_with_direction(signature, boundary, direction)
600 };
601 token.encode().map_err(|err| {
602 InternalError::new(
603 ErrorClass::Internal,
604 ErrorOrigin::Serialize,
605 format!("failed to encode continuation cursor: {err}"),
606 )
607 })
608 }
609}