1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6 db::{
7 Context, Db,
8 executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
9 executor::{OrderedKeyStream, OrderedKeyStreamBox},
10 index::{IndexKey, RawIndexKey},
11 query::plan::{
12 AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13 CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14 OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15 logical::PostAccessStats,
16 project_access_plan,
17 validate::{
18 PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19 validate_executor_plan,
20 },
21 },
22 response::Response,
23 },
24 error::{ErrorClass, ErrorOrigin, InternalError},
25 obs::sink::{ExecKind, Span},
26 traits::{EntityKind, EntityValue},
27 types::Id,
28 value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41 pub(crate) items: Response<E>,
42
43 pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53 ByKey,
54 ByKeys,
55 KeyRange,
56 IndexPrefix,
57 IndexRange,
58 FullScan,
59 Union,
60 Intersection,
61}
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70 SecondaryOrder,
71 IndexRangeLimit,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81 PrimaryKey,
82 SecondaryIndex,
83 IndexRange,
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95 pub access_path_variant: ExecutionAccessPathVariant,
96 pub direction: OrderDirection,
97 pub pushdown_used: bool,
98 pub pushdown_type: Option<ExecutionPushdownType>,
99 pub fast_path_used: Option<ExecutionFastPath>,
100 pub keys_scanned: u64,
101 pub rows_returned: u64,
102 pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107 Self {
108 access_path_variant: access_path_variant(access),
109 direction: execution_order_direction(direction),
110 pushdown_used: false,
111 pushdown_type: None,
112 fast_path_used: None,
113 keys_scanned: 0,
114 rows_returned: 0,
115 continuation_applied,
116 }
117 }
118
119 fn set_path_outcome(
120 &mut self,
121 fast_path_used: Option<ExecutionFastPath>,
122 pushdown_type: Option<ExecutionPushdownType>,
123 keys_scanned: usize,
124 rows_returned: usize,
125 ) {
126 self.fast_path_used = fast_path_used;
127 self.pushdown_type = pushdown_type;
128 self.pushdown_used = pushdown_type.is_some();
129 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131 }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137 type Output = ExecutionAccessPathVariant;
138
139 fn by_key(&mut self, _key: &K) -> Self::Output {
140 ExecutionAccessPathVariant::ByKey
141 }
142
143 fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144 ExecutionAccessPathVariant::ByKeys
145 }
146
147 fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148 ExecutionAccessPathVariant::KeyRange
149 }
150
151 fn index_prefix(
152 &mut self,
153 _index_name: &'static str,
154 _index_fields: &[&'static str],
155 _prefix_len: usize,
156 _values: &[Value],
157 ) -> Self::Output {
158 ExecutionAccessPathVariant::IndexPrefix
159 }
160
161 fn index_range(
162 &mut self,
163 _index_name: &'static str,
164 _index_fields: &[&'static str],
165 _prefix_len: usize,
166 _prefix: &[Value],
167 _lower: &Bound<Value>,
168 _upper: &Bound<Value>,
169 ) -> Self::Output {
170 ExecutionAccessPathVariant::IndexRange
171 }
172
173 fn full_scan(&mut self) -> Self::Output {
174 ExecutionAccessPathVariant::FullScan
175 }
176
177 fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178 ExecutionAccessPathVariant::Union
179 }
180
181 fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182 ExecutionAccessPathVariant::Intersection
183 }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187 let mut projection = ExecutionAccessProjection;
188 project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192 match direction {
193 Direction::Asc => OrderDirection::Asc,
194 Direction::Desc => OrderDirection::Desc,
195 }
196}
197
198struct FastPathKeyResult {
205 ordered_key_stream: OrderedKeyStreamBox,
206 rows_scanned: usize,
207 fast_path_used: ExecutionFastPath,
208 pushdown_type: Option<ExecutionPushdownType>,
209}
210
211#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220 db: Db<E::Canister>,
221 debug: bool,
222 _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227 E: EntityKind + EntityValue,
228{
229 #[must_use]
230 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231 Self {
232 db,
233 debug,
234 _marker: PhantomData,
235 }
236 }
237
238 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239 self.execute_paged_with_cursor(plan, PlannedCursor::none())
240 .map(|page| page.items)
241 }
242
243 pub(in crate::db) fn execute_paged_with_cursor(
244 &self,
245 plan: ExecutablePlan<E>,
246 cursor: impl Into<PlannedCursor>,
247 ) -> Result<CursorPage<E>, InternalError> {
248 self.execute_paged_with_cursor_traced(plan, cursor)
249 .map(|(page, _)| page)
250 }
251
252 pub(in crate::db) fn execute_paged_with_cursor_traced(
253 &self,
254 plan: ExecutablePlan<E>,
255 cursor: impl Into<PlannedCursor>,
256 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258 let cursor_boundary = cursor.boundary().cloned();
259 let index_range_anchor = cursor.index_range_anchor().cloned();
260
261 if !plan.mode().is_load() {
262 return Err(InternalError::new(
263 ErrorClass::InvariantViolation,
264 ErrorOrigin::Query,
265 "executor invariant violated: load executor requires load plans",
266 ));
267 }
268
269 let direction = plan.direction();
270 let continuation_signature = plan.continuation_signature();
271 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
272 let mut execution_trace = self
273 .debug
274 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
275
276 let result = (|| {
277 let mut span = Span::<E>::new(ExecKind::Load);
278 let plan = plan.into_inner();
279
280 validate_executor_plan::<E>(&plan)?;
281 let ctx = self.db.recovered_context::<E>()?;
282
283 record_plan_metrics(&plan.access);
284 let secondary_pushdown_applicability =
287 Self::assess_secondary_order_pushdown_applicability(&plan);
288
289 if let Some(page) = Self::try_execute_fast_paths(
290 &ctx,
291 &plan,
292 &secondary_pushdown_applicability,
293 cursor_boundary.as_ref(),
294 index_range_anchor.as_ref(),
295 direction,
296 continuation_signature,
297 &mut span,
298 &mut execution_trace,
299 )? {
300 return Ok(page);
301 }
302
303 let mut key_stream = ctx.ordered_key_stream_from_access_plan_with_index_range_anchor(
304 &plan.access,
305 index_range_anchor.as_ref(),
306 direction,
307 )?;
308 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
309 &ctx,
310 &plan,
311 key_stream.as_mut(),
312 cursor_boundary.as_ref(),
313 direction,
314 continuation_signature,
315 )?;
316 Self::finalize_path_outcome(
317 &mut execution_trace,
318 None,
319 None,
320 keys_scanned,
321 post_access_rows,
322 );
323
324 set_rows_from_len(&mut span, page.items.0.len());
325 Ok(page)
326 })();
327
328 result.map(|page| (page, execution_trace))
329 }
330
331 fn finalize_path_outcome(
333 execution_trace: &mut Option<ExecutionTrace>,
334 fast_path_used: Option<ExecutionFastPath>,
335 pushdown_type: Option<ExecutionPushdownType>,
336 rows_scanned: usize,
337 rows_returned: usize,
338 ) {
339 record_rows_scanned::<E>(rows_scanned);
340 if let Some(execution_trace) = execution_trace.as_mut() {
341 execution_trace.set_path_outcome(
342 fast_path_used,
343 pushdown_type,
344 rows_scanned,
345 rows_returned,
346 );
347 debug_assert_eq!(
348 execution_trace.keys_scanned,
349 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
350 "execution trace keys_scanned must match rows_scanned metrics input",
351 );
352 }
353 }
354
355 fn materialize_key_stream_into_page(
357 ctx: &Context<'_, E>,
358 plan: &LogicalPlan<E::Key>,
359 key_stream: &mut dyn OrderedKeyStream,
360 cursor_boundary: Option<&CursorBoundary>,
361 direction: Direction,
362 continuation_signature: ContinuationSignature,
363 ) -> Result<(CursorPage<E>, usize, usize), InternalError> {
364 let data_rows = ctx.rows_from_ordered_key_stream(key_stream, plan.consistency)?;
365 let rows_scanned = data_rows.len();
366 let mut rows = Context::deserialize_rows(data_rows)?;
367 let page = Self::finalize_rows_into_page(
368 plan,
369 &mut rows,
370 cursor_boundary,
371 direction,
372 continuation_signature,
373 )?;
374 let post_access_rows = page.items.0.len();
375
376 Ok((page, rows_scanned, post_access_rows))
377 }
378
379 fn validate_pk_fast_path_boundary_if_applicable(
381 plan: &LogicalPlan<E::Key>,
382 cursor_boundary: Option<&CursorBoundary>,
383 ) -> Result<(), InternalError> {
384 if !Self::is_pk_order_stream_eligible(plan) {
385 return Ok(());
386 }
387 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
388
389 Ok(())
390 }
391
392 fn index_range_limit_pushdown_fetch(
393 plan: &LogicalPlan<E::Key>,
394 cursor_boundary: Option<&CursorBoundary>,
395 index_range_anchor: Option<&RawIndexKey>,
396 ) -> Option<usize> {
397 if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
398 return None;
399 }
400 if cursor_boundary.is_some() && index_range_anchor.is_none() {
401 return None;
402 }
403
404 let page = plan.page.as_ref()?;
405 let limit = page.limit?;
406 if limit == 0 {
407 return Some(0);
408 }
409
410 let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
411 let limit = usize::try_from(limit).unwrap_or(usize::MAX);
412 let page_end = offset.saturating_add(limit);
413 let needs_extra_row = true;
414
415 Some(page_end.saturating_add(usize::from(needs_extra_row)))
416 }
417
418 #[expect(
420 clippy::too_many_arguments,
421 reason = "fast-path dispatch keeps execution inputs explicit at one call site"
422 )]
423 fn try_execute_fast_paths(
424 ctx: &Context<'_, E>,
425 plan: &LogicalPlan<E::Key>,
426 secondary_pushdown_applicability: &PushdownApplicability,
427 cursor_boundary: Option<&CursorBoundary>,
428 index_range_anchor: Option<&RawIndexKey>,
429 direction: Direction,
430 continuation_signature: ContinuationSignature,
431 span: &mut Span<E>,
432 execution_trace: &mut Option<ExecutionTrace>,
433 ) -> Result<Option<CursorPage<E>>, InternalError> {
434 Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
435
436 if let Some(mut fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
437 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
438 ctx,
439 plan,
440 fast.ordered_key_stream.as_mut(),
441 cursor_boundary,
442 direction,
443 continuation_signature,
444 )?;
445 Self::finalize_path_outcome(
446 execution_trace,
447 Some(fast.fast_path_used),
448 fast.pushdown_type,
449 fast.rows_scanned,
450 post_access_rows,
451 );
452 set_rows_from_len(span, page.items.0.len());
453 return Ok(Some(page));
454 }
455
456 if let Some(mut fast) = Self::try_execute_secondary_index_order_stream(
457 ctx,
458 plan,
459 secondary_pushdown_applicability,
460 )? {
461 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
462 ctx,
463 plan,
464 fast.ordered_key_stream.as_mut(),
465 cursor_boundary,
466 direction,
467 continuation_signature,
468 )?;
469 Self::finalize_path_outcome(
470 execution_trace,
471 Some(fast.fast_path_used),
472 fast.pushdown_type,
473 fast.rows_scanned,
474 post_access_rows,
475 );
476 set_rows_from_len(span, page.items.0.len());
477 return Ok(Some(page));
478 }
479
480 let index_range_limit_fetch =
481 Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
482 if let Some(mut fast) = Self::try_execute_index_range_limit_pushdown_stream(
483 ctx,
484 plan,
485 index_range_anchor,
486 direction,
487 index_range_limit_fetch,
488 )? {
489 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
490 ctx,
491 plan,
492 fast.ordered_key_stream.as_mut(),
493 cursor_boundary,
494 direction,
495 continuation_signature,
496 )?;
497 Self::finalize_path_outcome(
498 execution_trace,
499 Some(fast.fast_path_used),
500 fast.pushdown_type,
501 fast.rows_scanned,
502 post_access_rows,
503 );
504 set_rows_from_len(span, page.items.0.len());
505 return Ok(Some(page));
506 }
507
508 Ok(None)
509 }
510
511 fn finalize_rows_into_page(
513 plan: &LogicalPlan<E::Key>,
514 rows: &mut Vec<(Id<E>, E)>,
515 cursor_boundary: Option<&CursorBoundary>,
516 direction: Direction,
517 continuation_signature: ContinuationSignature,
518 ) -> Result<CursorPage<E>, InternalError> {
519 let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
520 let next_cursor =
521 Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
522 let items = Response(std::mem::take(rows));
523
524 Ok(CursorPage { items, next_cursor })
525 }
526
527 fn assess_secondary_order_pushdown_applicability(
530 plan: &LogicalPlan<E::Key>,
531 ) -> PushdownApplicability {
532 assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
533 }
534
535 fn build_next_cursor(
536 plan: &LogicalPlan<E::Key>,
537 rows: &[(Id<E>, E)],
538 stats: &PostAccessStats,
539 direction: Direction,
540 signature: ContinuationSignature,
541 ) -> Result<Option<Vec<u8>>, InternalError> {
542 let Some(page) = plan.page.as_ref() else {
543 return Ok(None);
544 };
545 let Some(limit) = page.limit else {
546 return Ok(None);
547 };
548 if rows.is_empty() {
549 return Ok(None);
550 }
551
552 let page_end = (page.offset as usize).saturating_add(limit as usize);
554 if stats.rows_after_cursor <= page_end {
555 return Ok(None);
556 }
557
558 let Some((_, last_entity)) = rows.last() else {
559 return Ok(None);
560 };
561 Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
562 }
563
564 fn encode_next_cursor_for_last_entity(
566 plan: &LogicalPlan<E::Key>,
567 last_entity: &E,
568 direction: Direction,
569 signature: ContinuationSignature,
570 ) -> Result<Vec<u8>, InternalError> {
571 let boundary = plan.cursor_boundary_from_entity(last_entity)?;
572 let token = if plan.access.cursor_support().supports_index_range_anchor() {
573 let (index, _, _, _) =
574 plan.access.as_index_range_path().ok_or_else(|| {
575 InternalError::new(
576 ErrorClass::InvariantViolation,
577 ErrorOrigin::Query,
578 "executor invariant violated: index-range cursor support missing concrete index-range path",
579 )
580 })?;
581 let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
582 InternalError::new(
583 ErrorClass::InvariantViolation,
584 ErrorOrigin::Query,
585 "executor invariant violated: cursor row is not indexable for planned index-range access",
586 )
587 })?;
588
589 ContinuationToken::new_index_range_with_direction(
590 signature,
591 boundary,
592 IndexRangeCursorAnchor::new(index_key.to_raw()),
593 direction,
594 )
595 } else {
596 ContinuationToken::new_with_direction(signature, boundary, direction)
597 };
598 token.encode().map_err(|err| {
599 InternalError::new(
600 ErrorClass::Internal,
601 ErrorOrigin::Serialize,
602 format!("failed to encode continuation cursor: {err}"),
603 )
604 })
605 }
606}