1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6 db::{
7 Context, Db,
8 executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
9 executor::{OrderedKeyStream, OrderedKeyStreamBox},
10 index::{IndexKey, RawIndexKey},
11 query::plan::{
12 AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13 CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14 OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15 logical::PostAccessStats,
16 project_access_plan,
17 validate::{
18 PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19 validate_executor_plan,
20 },
21 },
22 response::Response,
23 },
24 error::{ErrorClass, ErrorOrigin, InternalError},
25 obs::sink::{ExecKind, Span},
26 traits::{EntityKind, EntityValue},
27 types::Id,
28 value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41 pub(crate) items: Response<E>,
42
43 pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53 ByKey,
54 ByKeys,
55 KeyRange,
56 IndexPrefix,
57 IndexRange,
58 FullScan,
59 Union,
60 Intersection,
61}
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70 SecondaryOrder,
71 IndexRangeLimit,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81 PrimaryKey,
82 SecondaryIndex,
83 IndexRange,
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95 pub access_path_variant: ExecutionAccessPathVariant,
96 pub direction: OrderDirection,
97 pub pushdown_used: bool,
98 pub pushdown_type: Option<ExecutionPushdownType>,
99 pub fast_path_used: Option<ExecutionFastPath>,
100 pub keys_scanned: u64,
101 pub rows_returned: u64,
102 pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107 Self {
108 access_path_variant: access_path_variant(access),
109 direction: execution_order_direction(direction),
110 pushdown_used: false,
111 pushdown_type: None,
112 fast_path_used: None,
113 keys_scanned: 0,
114 rows_returned: 0,
115 continuation_applied,
116 }
117 }
118
119 fn set_path_outcome(
120 &mut self,
121 fast_path_used: Option<ExecutionFastPath>,
122 pushdown_type: Option<ExecutionPushdownType>,
123 keys_scanned: usize,
124 rows_returned: usize,
125 ) {
126 self.fast_path_used = fast_path_used;
127 self.pushdown_type = pushdown_type;
128 self.pushdown_used = pushdown_type.is_some();
129 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131 }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137 type Output = ExecutionAccessPathVariant;
138
139 fn by_key(&mut self, _key: &K) -> Self::Output {
140 ExecutionAccessPathVariant::ByKey
141 }
142
143 fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144 ExecutionAccessPathVariant::ByKeys
145 }
146
147 fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148 ExecutionAccessPathVariant::KeyRange
149 }
150
151 fn index_prefix(
152 &mut self,
153 _index_name: &'static str,
154 _index_fields: &[&'static str],
155 _prefix_len: usize,
156 _values: &[Value],
157 ) -> Self::Output {
158 ExecutionAccessPathVariant::IndexPrefix
159 }
160
161 fn index_range(
162 &mut self,
163 _index_name: &'static str,
164 _index_fields: &[&'static str],
165 _prefix_len: usize,
166 _prefix: &[Value],
167 _lower: &Bound<Value>,
168 _upper: &Bound<Value>,
169 ) -> Self::Output {
170 ExecutionAccessPathVariant::IndexRange
171 }
172
173 fn full_scan(&mut self) -> Self::Output {
174 ExecutionAccessPathVariant::FullScan
175 }
176
177 fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178 ExecutionAccessPathVariant::Union
179 }
180
181 fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182 ExecutionAccessPathVariant::Intersection
183 }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187 let mut projection = ExecutionAccessProjection;
188 project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192 match direction {
193 Direction::Asc => OrderDirection::Asc,
194 Direction::Desc => OrderDirection::Desc,
195 }
196}
197
198struct FastPathKeyResult {
205 ordered_key_stream: OrderedKeyStreamBox,
206 rows_scanned: usize,
207 fast_path_used: ExecutionFastPath,
208 pushdown_type: Option<ExecutionPushdownType>,
209}
210
211#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220 db: Db<E::Canister>,
221 debug: bool,
222 _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227 E: EntityKind + EntityValue,
228{
229 #[must_use]
230 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231 Self {
232 db,
233 debug,
234 _marker: PhantomData,
235 }
236 }
237
238 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239 self.execute_paged_with_cursor(plan, PlannedCursor::none())
240 .map(|page| page.items)
241 }
242
243 pub(in crate::db) fn execute_paged_with_cursor(
244 &self,
245 plan: ExecutablePlan<E>,
246 cursor: impl Into<PlannedCursor>,
247 ) -> Result<CursorPage<E>, InternalError> {
248 self.execute_paged_with_cursor_traced(plan, cursor)
249 .map(|(page, _)| page)
250 }
251
252 pub(in crate::db) fn execute_paged_with_cursor_traced(
253 &self,
254 plan: ExecutablePlan<E>,
255 cursor: impl Into<PlannedCursor>,
256 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258 let cursor_boundary = cursor.boundary().cloned();
259 let index_range_anchor = cursor.index_range_anchor().cloned();
260
261 if !plan.mode().is_load() {
262 return Err(InternalError::query_invariant(
263 "executor invariant violated: load executor requires load plans",
264 ));
265 }
266
267 let direction = plan.direction();
268 let continuation_signature = plan.continuation_signature();
269 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
270 let mut execution_trace = self
271 .debug
272 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
273
274 let result = (|| {
275 let mut span = Span::<E>::new(ExecKind::Load);
276 let plan = plan.into_inner();
277
278 validate_executor_plan::<E>(&plan)?;
279 let ctx = self.db.recovered_context::<E>()?;
280
281 record_plan_metrics(&plan.access);
282 let secondary_pushdown_applicability =
285 Self::assess_secondary_order_pushdown_applicability(&plan);
286
287 if let Some(page) = Self::try_execute_fast_paths(
288 &ctx,
289 &plan,
290 &secondary_pushdown_applicability,
291 cursor_boundary.as_ref(),
292 index_range_anchor.as_ref(),
293 direction,
294 continuation_signature,
295 &mut span,
296 &mut execution_trace,
297 )? {
298 return Ok(page);
299 }
300
301 let mut key_stream = ctx.ordered_key_stream_from_access_plan_with_index_range_anchor(
302 &plan.access,
303 index_range_anchor.as_ref(),
304 direction,
305 )?;
306 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
307 &ctx,
308 &plan,
309 key_stream.as_mut(),
310 cursor_boundary.as_ref(),
311 direction,
312 continuation_signature,
313 )?;
314 Self::finalize_path_outcome(
315 &mut execution_trace,
316 None,
317 None,
318 keys_scanned,
319 post_access_rows,
320 );
321
322 set_rows_from_len(&mut span, page.items.0.len());
323 Ok(page)
324 })();
325
326 result.map(|page| (page, execution_trace))
327 }
328
329 fn finalize_path_outcome(
331 execution_trace: &mut Option<ExecutionTrace>,
332 fast_path_used: Option<ExecutionFastPath>,
333 pushdown_type: Option<ExecutionPushdownType>,
334 rows_scanned: usize,
335 rows_returned: usize,
336 ) {
337 record_rows_scanned::<E>(rows_scanned);
338 if let Some(execution_trace) = execution_trace.as_mut() {
339 execution_trace.set_path_outcome(
340 fast_path_used,
341 pushdown_type,
342 rows_scanned,
343 rows_returned,
344 );
345 debug_assert_eq!(
346 execution_trace.keys_scanned,
347 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
348 "execution trace keys_scanned must match rows_scanned metrics input",
349 );
350 }
351 }
352
353 fn materialize_key_stream_into_page(
355 ctx: &Context<'_, E>,
356 plan: &LogicalPlan<E::Key>,
357 key_stream: &mut dyn OrderedKeyStream,
358 cursor_boundary: Option<&CursorBoundary>,
359 direction: Direction,
360 continuation_signature: ContinuationSignature,
361 ) -> Result<(CursorPage<E>, usize, usize), InternalError> {
362 let data_rows = ctx.rows_from_ordered_key_stream(key_stream, plan.consistency)?;
363 let rows_scanned = data_rows.len();
364 let mut rows = Context::deserialize_rows(data_rows)?;
365 let page = Self::finalize_rows_into_page(
366 plan,
367 &mut rows,
368 cursor_boundary,
369 direction,
370 continuation_signature,
371 )?;
372 let post_access_rows = page.items.0.len();
373
374 Ok((page, rows_scanned, post_access_rows))
375 }
376
377 fn validate_pk_fast_path_boundary_if_applicable(
379 plan: &LogicalPlan<E::Key>,
380 cursor_boundary: Option<&CursorBoundary>,
381 ) -> Result<(), InternalError> {
382 if !Self::is_pk_order_stream_eligible(plan) {
383 return Ok(());
384 }
385 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
386
387 Ok(())
388 }
389
390 fn index_range_limit_pushdown_fetch(
391 plan: &LogicalPlan<E::Key>,
392 cursor_boundary: Option<&CursorBoundary>,
393 index_range_anchor: Option<&RawIndexKey>,
394 ) -> Option<usize> {
395 if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
396 return None;
397 }
398 if cursor_boundary.is_some() && index_range_anchor.is_none() {
399 return None;
400 }
401
402 let page = plan.page.as_ref()?;
403 let limit = page.limit?;
404 if limit == 0 {
405 return Some(0);
406 }
407
408 let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
409 let limit = usize::try_from(limit).unwrap_or(usize::MAX);
410 let page_end = offset.saturating_add(limit);
411 let needs_extra_row = true;
412
413 Some(page_end.saturating_add(usize::from(needs_extra_row)))
414 }
415
416 #[expect(
418 clippy::too_many_arguments,
419 reason = "fast-path dispatch keeps execution inputs explicit at one call site"
420 )]
421 fn try_execute_fast_paths(
422 ctx: &Context<'_, E>,
423 plan: &LogicalPlan<E::Key>,
424 secondary_pushdown_applicability: &PushdownApplicability,
425 cursor_boundary: Option<&CursorBoundary>,
426 index_range_anchor: Option<&RawIndexKey>,
427 direction: Direction,
428 continuation_signature: ContinuationSignature,
429 span: &mut Span<E>,
430 execution_trace: &mut Option<ExecutionTrace>,
431 ) -> Result<Option<CursorPage<E>>, InternalError> {
432 Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
433
434 if let Some(mut fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
435 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
436 ctx,
437 plan,
438 fast.ordered_key_stream.as_mut(),
439 cursor_boundary,
440 direction,
441 continuation_signature,
442 )?;
443 Self::finalize_path_outcome(
444 execution_trace,
445 Some(fast.fast_path_used),
446 fast.pushdown_type,
447 fast.rows_scanned,
448 post_access_rows,
449 );
450 set_rows_from_len(span, page.items.0.len());
451 return Ok(Some(page));
452 }
453
454 if let Some(mut fast) = Self::try_execute_secondary_index_order_stream(
455 ctx,
456 plan,
457 secondary_pushdown_applicability,
458 )? {
459 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
460 ctx,
461 plan,
462 fast.ordered_key_stream.as_mut(),
463 cursor_boundary,
464 direction,
465 continuation_signature,
466 )?;
467 Self::finalize_path_outcome(
468 execution_trace,
469 Some(fast.fast_path_used),
470 fast.pushdown_type,
471 fast.rows_scanned,
472 post_access_rows,
473 );
474 set_rows_from_len(span, page.items.0.len());
475 return Ok(Some(page));
476 }
477
478 let index_range_limit_fetch =
479 Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
480 if let Some(mut fast) = Self::try_execute_index_range_limit_pushdown_stream(
481 ctx,
482 plan,
483 index_range_anchor,
484 direction,
485 index_range_limit_fetch,
486 )? {
487 let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
488 ctx,
489 plan,
490 fast.ordered_key_stream.as_mut(),
491 cursor_boundary,
492 direction,
493 continuation_signature,
494 )?;
495 Self::finalize_path_outcome(
496 execution_trace,
497 Some(fast.fast_path_used),
498 fast.pushdown_type,
499 fast.rows_scanned,
500 post_access_rows,
501 );
502 set_rows_from_len(span, page.items.0.len());
503 return Ok(Some(page));
504 }
505
506 Ok(None)
507 }
508
509 fn finalize_rows_into_page(
511 plan: &LogicalPlan<E::Key>,
512 rows: &mut Vec<(Id<E>, E)>,
513 cursor_boundary: Option<&CursorBoundary>,
514 direction: Direction,
515 continuation_signature: ContinuationSignature,
516 ) -> Result<CursorPage<E>, InternalError> {
517 let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
518 let next_cursor =
519 Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
520 let items = Response(std::mem::take(rows));
521
522 Ok(CursorPage { items, next_cursor })
523 }
524
525 fn assess_secondary_order_pushdown_applicability(
528 plan: &LogicalPlan<E::Key>,
529 ) -> PushdownApplicability {
530 assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
531 }
532
533 fn build_next_cursor(
534 plan: &LogicalPlan<E::Key>,
535 rows: &[(Id<E>, E)],
536 stats: &PostAccessStats,
537 direction: Direction,
538 signature: ContinuationSignature,
539 ) -> Result<Option<Vec<u8>>, InternalError> {
540 let Some(page) = plan.page.as_ref() else {
541 return Ok(None);
542 };
543 let Some(limit) = page.limit else {
544 return Ok(None);
545 };
546 if rows.is_empty() {
547 return Ok(None);
548 }
549
550 let page_end = (page.offset as usize).saturating_add(limit as usize);
552 if stats.rows_after_cursor <= page_end {
553 return Ok(None);
554 }
555
556 let Some((_, last_entity)) = rows.last() else {
557 return Ok(None);
558 };
559 Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
560 }
561
562 fn encode_next_cursor_for_last_entity(
564 plan: &LogicalPlan<E::Key>,
565 last_entity: &E,
566 direction: Direction,
567 signature: ContinuationSignature,
568 ) -> Result<Vec<u8>, InternalError> {
569 let boundary = plan.cursor_boundary_from_entity(last_entity)?;
570 let token = if plan.access.cursor_support().supports_index_range_anchor() {
571 let (index, _, _, _) =
572 plan.access.as_index_range_path().ok_or_else(|| {
573 InternalError::query_invariant(
574 "executor invariant violated: index-range cursor support missing concrete index-range path",
575 )
576 })?;
577 let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
578 InternalError::query_invariant(
579 "executor invariant violated: cursor row is not indexable for planned index-range access",
580 )
581 })?;
582
583 ContinuationToken::new_index_range_with_direction(
584 signature,
585 boundary,
586 IndexRangeCursorAnchor::new(index_key.to_raw()),
587 direction,
588 )
589 } else {
590 ContinuationToken::new_with_direction(signature, boundary, direction)
591 };
592 token.encode().map_err(|err| {
593 InternalError::new(
594 ErrorClass::Internal,
595 ErrorOrigin::Serialize,
596 format!("failed to encode continuation cursor: {err}"),
597 )
598 })
599 }
600}