Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6    db::{
7        Context, Db,
8        executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
9        executor::{OrderedKeyStream, OrderedKeyStreamBox},
10        index::{IndexKey, RawIndexKey},
11        query::plan::{
12            AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13            CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14            OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15            logical::PostAccessStats,
16            project_access_plan,
17            validate::{
18                PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19                validate_executor_plan,
20            },
21        },
22        response::Response,
23    },
24    error::{ErrorClass, ErrorOrigin, InternalError},
25    obs::sink::{ExecKind, Span},
26    traits::{EntityKind, EntityValue},
27    types::Id,
28    value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32///
33/// CursorPage
34///
35/// Internal load page result with continuation cursor payload.
36/// Returned by paged executor entrypoints.
37///
38
39#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41    pub(crate) items: Response<E>,
42
43    pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46///
47/// ExecutionAccessPathVariant
48///
49/// Coarse access path shape used by the load execution trace surface.
50///
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53    ByKey,
54    ByKeys,
55    KeyRange,
56    IndexPrefix,
57    IndexRange,
58    FullScan,
59    Union,
60    Intersection,
61}
62
63///
64/// ExecutionPushdownType
65///
66/// Pushdown optimization kind applied by load execution, if any.
67///
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70    SecondaryOrder,
71    IndexRangeLimit,
72}
73
74///
75/// ExecutionFastPath
76///
77/// Fast-path branch selected by load execution, if any.
78///
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81    PrimaryKey,
82    SecondaryIndex,
83    IndexRange,
84}
85
86///
87/// ExecutionTrace
88///
89/// Structured, opt-in load execution introspection snapshot.
90/// Captures plan-shape and execution decisions without changing semantics.
91///
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95    pub access_path_variant: ExecutionAccessPathVariant,
96    pub direction: OrderDirection,
97    pub pushdown_used: bool,
98    pub pushdown_type: Option<ExecutionPushdownType>,
99    pub fast_path_used: Option<ExecutionFastPath>,
100    pub keys_scanned: u64,
101    pub rows_returned: u64,
102    pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107        Self {
108            access_path_variant: access_path_variant(access),
109            direction: execution_order_direction(direction),
110            pushdown_used: false,
111            pushdown_type: None,
112            fast_path_used: None,
113            keys_scanned: 0,
114            rows_returned: 0,
115            continuation_applied,
116        }
117    }
118
119    fn set_path_outcome(
120        &mut self,
121        fast_path_used: Option<ExecutionFastPath>,
122        pushdown_type: Option<ExecutionPushdownType>,
123        keys_scanned: usize,
124        rows_returned: usize,
125    ) {
126        self.fast_path_used = fast_path_used;
127        self.pushdown_type = pushdown_type;
128        self.pushdown_used = pushdown_type.is_some();
129        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131    }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137    type Output = ExecutionAccessPathVariant;
138
139    fn by_key(&mut self, _key: &K) -> Self::Output {
140        ExecutionAccessPathVariant::ByKey
141    }
142
143    fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144        ExecutionAccessPathVariant::ByKeys
145    }
146
147    fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148        ExecutionAccessPathVariant::KeyRange
149    }
150
151    fn index_prefix(
152        &mut self,
153        _index_name: &'static str,
154        _index_fields: &[&'static str],
155        _prefix_len: usize,
156        _values: &[Value],
157    ) -> Self::Output {
158        ExecutionAccessPathVariant::IndexPrefix
159    }
160
161    fn index_range(
162        &mut self,
163        _index_name: &'static str,
164        _index_fields: &[&'static str],
165        _prefix_len: usize,
166        _prefix: &[Value],
167        _lower: &Bound<Value>,
168        _upper: &Bound<Value>,
169    ) -> Self::Output {
170        ExecutionAccessPathVariant::IndexRange
171    }
172
173    fn full_scan(&mut self) -> Self::Output {
174        ExecutionAccessPathVariant::FullScan
175    }
176
177    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178        ExecutionAccessPathVariant::Union
179    }
180
181    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182        ExecutionAccessPathVariant::Intersection
183    }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187    let mut projection = ExecutionAccessProjection;
188    project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192    match direction {
193        Direction::Asc => OrderDirection::Asc,
194        Direction::Desc => OrderDirection::Desc,
195    }
196}
197
198///
199/// FastPathKeyResult
200///
201/// Internal fast-path access result.
202/// Carries ordered keys plus observability metadata for shared execution phases.
203///
204struct FastPathKeyResult {
205    ordered_key_stream: OrderedKeyStreamBox,
206    rows_scanned: usize,
207    fast_path_used: ExecutionFastPath,
208    pushdown_type: Option<ExecutionPushdownType>,
209}
210
211///
212/// LoadExecutor
213///
214/// Load-plan executor with canonical post-access semantics.
215/// Coordinates fast paths, trace hooks, and pagination cursors.
216///
217
218#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220    db: Db<E::Canister>,
221    debug: bool,
222    _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227    E: EntityKind + EntityValue,
228{
229    #[must_use]
230    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231        Self {
232            db,
233            debug,
234            _marker: PhantomData,
235        }
236    }
237
238    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239        self.execute_paged_with_cursor(plan, PlannedCursor::none())
240            .map(|page| page.items)
241    }
242
243    pub(in crate::db) fn execute_paged_with_cursor(
244        &self,
245        plan: ExecutablePlan<E>,
246        cursor: impl Into<PlannedCursor>,
247    ) -> Result<CursorPage<E>, InternalError> {
248        self.execute_paged_with_cursor_traced(plan, cursor)
249            .map(|(page, _)| page)
250    }
251
252    pub(in crate::db) fn execute_paged_with_cursor_traced(
253        &self,
254        plan: ExecutablePlan<E>,
255        cursor: impl Into<PlannedCursor>,
256    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258        let cursor_boundary = cursor.boundary().cloned();
259        let index_range_anchor = cursor.index_range_anchor().cloned();
260
261        if !plan.mode().is_load() {
262            return Err(InternalError::new(
263                ErrorClass::InvariantViolation,
264                ErrorOrigin::Query,
265                "executor invariant violated: load executor requires load plans",
266            ));
267        }
268
269        let direction = plan.direction();
270        let continuation_signature = plan.continuation_signature();
271        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
272        let mut execution_trace = self
273            .debug
274            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
275
276        let result = (|| {
277            let mut span = Span::<E>::new(ExecKind::Load);
278            let plan = plan.into_inner();
279
280            validate_executor_plan::<E>(&plan)?;
281            let ctx = self.db.recovered_context::<E>()?;
282
283            record_plan_metrics(&plan.access);
284            // Compute secondary ORDER BY pushdown eligibility once, then share the
285            // derived decision across trace and fast-path gating.
286            let secondary_pushdown_applicability =
287                Self::assess_secondary_order_pushdown_applicability(&plan);
288
289            if let Some(page) = Self::try_execute_fast_paths(
290                &ctx,
291                &plan,
292                &secondary_pushdown_applicability,
293                cursor_boundary.as_ref(),
294                index_range_anchor.as_ref(),
295                direction,
296                continuation_signature,
297                &mut span,
298                &mut execution_trace,
299            )? {
300                return Ok(page);
301            }
302
303            let mut key_stream = ctx.ordered_key_stream_from_access_plan_with_index_range_anchor(
304                &plan.access,
305                index_range_anchor.as_ref(),
306                direction,
307            )?;
308            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
309                &ctx,
310                &plan,
311                key_stream.as_mut(),
312                cursor_boundary.as_ref(),
313                direction,
314                continuation_signature,
315            )?;
316            Self::finalize_path_outcome(
317                &mut execution_trace,
318                None,
319                None,
320                keys_scanned,
321                post_access_rows,
322            );
323
324            set_rows_from_len(&mut span, page.items.0.len());
325            Ok(page)
326        })();
327
328        result.map(|page| (page, execution_trace))
329    }
330
331    // Record shared observability outcome for any execution path.
332    fn finalize_path_outcome(
333        execution_trace: &mut Option<ExecutionTrace>,
334        fast_path_used: Option<ExecutionFastPath>,
335        pushdown_type: Option<ExecutionPushdownType>,
336        rows_scanned: usize,
337        rows_returned: usize,
338    ) {
339        record_rows_scanned::<E>(rows_scanned);
340        if let Some(execution_trace) = execution_trace.as_mut() {
341            execution_trace.set_path_outcome(
342                fast_path_used,
343                pushdown_type,
344                rows_scanned,
345                rows_returned,
346            );
347            debug_assert_eq!(
348                execution_trace.keys_scanned,
349                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
350                "execution trace keys_scanned must match rows_scanned metrics input",
351            );
352        }
353    }
354
355    // Run the shared load phases for an already-produced ordered key stream.
356    fn materialize_key_stream_into_page(
357        ctx: &Context<'_, E>,
358        plan: &LogicalPlan<E::Key>,
359        key_stream: &mut dyn OrderedKeyStream,
360        cursor_boundary: Option<&CursorBoundary>,
361        direction: Direction,
362        continuation_signature: ContinuationSignature,
363    ) -> Result<(CursorPage<E>, usize, usize), InternalError> {
364        let data_rows = ctx.rows_from_ordered_key_stream(key_stream, plan.consistency)?;
365        let rows_scanned = data_rows.len();
366        let mut rows = Context::deserialize_rows(data_rows)?;
367        let page = Self::finalize_rows_into_page(
368            plan,
369            &mut rows,
370            cursor_boundary,
371            direction,
372            continuation_signature,
373        )?;
374        let post_access_rows = page.items.0.len();
375
376        Ok((page, rows_scanned, post_access_rows))
377    }
378
379    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
380    fn validate_pk_fast_path_boundary_if_applicable(
381        plan: &LogicalPlan<E::Key>,
382        cursor_boundary: Option<&CursorBoundary>,
383    ) -> Result<(), InternalError> {
384        if !Self::is_pk_order_stream_eligible(plan) {
385            return Ok(());
386        }
387        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
388
389        Ok(())
390    }
391
392    fn index_range_limit_pushdown_fetch(
393        plan: &LogicalPlan<E::Key>,
394        cursor_boundary: Option<&CursorBoundary>,
395        index_range_anchor: Option<&RawIndexKey>,
396    ) -> Option<usize> {
397        if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
398            return None;
399        }
400        if cursor_boundary.is_some() && index_range_anchor.is_none() {
401            return None;
402        }
403
404        let page = plan.page.as_ref()?;
405        let limit = page.limit?;
406        if limit == 0 {
407            return Some(0);
408        }
409
410        let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
411        let limit = usize::try_from(limit).unwrap_or(usize::MAX);
412        let page_end = offset.saturating_add(limit);
413        let needs_extra_row = true;
414
415        Some(page_end.saturating_add(usize::from(needs_extra_row)))
416    }
417
418    // Try each fast-path strategy in canonical order and return the first hit.
419    #[expect(
420        clippy::too_many_arguments,
421        reason = "fast-path dispatch keeps execution inputs explicit at one call site"
422    )]
423    fn try_execute_fast_paths(
424        ctx: &Context<'_, E>,
425        plan: &LogicalPlan<E::Key>,
426        secondary_pushdown_applicability: &PushdownApplicability,
427        cursor_boundary: Option<&CursorBoundary>,
428        index_range_anchor: Option<&RawIndexKey>,
429        direction: Direction,
430        continuation_signature: ContinuationSignature,
431        span: &mut Span<E>,
432        execution_trace: &mut Option<ExecutionTrace>,
433    ) -> Result<Option<CursorPage<E>>, InternalError> {
434        Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
435
436        if let Some(mut fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
437            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
438                ctx,
439                plan,
440                fast.ordered_key_stream.as_mut(),
441                cursor_boundary,
442                direction,
443                continuation_signature,
444            )?;
445            Self::finalize_path_outcome(
446                execution_trace,
447                Some(fast.fast_path_used),
448                fast.pushdown_type,
449                fast.rows_scanned,
450                post_access_rows,
451            );
452            set_rows_from_len(span, page.items.0.len());
453            return Ok(Some(page));
454        }
455
456        if let Some(mut fast) = Self::try_execute_secondary_index_order_stream(
457            ctx,
458            plan,
459            secondary_pushdown_applicability,
460        )? {
461            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
462                ctx,
463                plan,
464                fast.ordered_key_stream.as_mut(),
465                cursor_boundary,
466                direction,
467                continuation_signature,
468            )?;
469            Self::finalize_path_outcome(
470                execution_trace,
471                Some(fast.fast_path_used),
472                fast.pushdown_type,
473                fast.rows_scanned,
474                post_access_rows,
475            );
476            set_rows_from_len(span, page.items.0.len());
477            return Ok(Some(page));
478        }
479
480        let index_range_limit_fetch =
481            Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
482        if let Some(mut fast) = Self::try_execute_index_range_limit_pushdown_stream(
483            ctx,
484            plan,
485            index_range_anchor,
486            direction,
487            index_range_limit_fetch,
488        )? {
489            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
490                ctx,
491                plan,
492                fast.ordered_key_stream.as_mut(),
493                cursor_boundary,
494                direction,
495                continuation_signature,
496            )?;
497            Self::finalize_path_outcome(
498                execution_trace,
499                Some(fast.fast_path_used),
500                fast.pushdown_type,
501                fast.rows_scanned,
502                post_access_rows,
503            );
504            set_rows_from_len(span, page.items.0.len());
505            return Ok(Some(page));
506        }
507
508        Ok(None)
509    }
510
511    // Apply canonical post-access phases to scanned rows and assemble the cursor page.
512    fn finalize_rows_into_page(
513        plan: &LogicalPlan<E::Key>,
514        rows: &mut Vec<(Id<E>, E)>,
515        cursor_boundary: Option<&CursorBoundary>,
516        direction: Direction,
517        continuation_signature: ContinuationSignature,
518    ) -> Result<CursorPage<E>, InternalError> {
519        let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
520        let next_cursor =
521            Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
522        let items = Response(std::mem::take(rows));
523
524        Ok(CursorPage { items, next_cursor })
525    }
526
527    // Assess secondary-index ORDER BY pushdown once for this execution and
528    // map matrix outcomes to executor decisions.
529    fn assess_secondary_order_pushdown_applicability(
530        plan: &LogicalPlan<E::Key>,
531    ) -> PushdownApplicability {
532        assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
533    }
534
535    fn build_next_cursor(
536        plan: &LogicalPlan<E::Key>,
537        rows: &[(Id<E>, E)],
538        stats: &PostAccessStats,
539        direction: Direction,
540        signature: ContinuationSignature,
541    ) -> Result<Option<Vec<u8>>, InternalError> {
542        let Some(page) = plan.page.as_ref() else {
543            return Ok(None);
544        };
545        let Some(limit) = page.limit else {
546            return Ok(None);
547        };
548        if rows.is_empty() {
549            return Ok(None);
550        }
551
552        // NOTE: post-access execution materializes full in-memory rows for Phase 1.
553        let page_end = (page.offset as usize).saturating_add(limit as usize);
554        if stats.rows_after_cursor <= page_end {
555            return Ok(None);
556        }
557
558        let Some((_, last_entity)) = rows.last() else {
559            return Ok(None);
560        };
561        Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
562    }
563
564    // Encode the continuation token from the last returned entity.
565    fn encode_next_cursor_for_last_entity(
566        plan: &LogicalPlan<E::Key>,
567        last_entity: &E,
568        direction: Direction,
569        signature: ContinuationSignature,
570    ) -> Result<Vec<u8>, InternalError> {
571        let boundary = plan.cursor_boundary_from_entity(last_entity)?;
572        let token = if plan.access.cursor_support().supports_index_range_anchor() {
573            let (index, _, _, _) =
574                plan.access.as_index_range_path().ok_or_else(|| {
575                    InternalError::new(
576                        ErrorClass::InvariantViolation,
577                        ErrorOrigin::Query,
578                        "executor invariant violated: index-range cursor support missing concrete index-range path",
579                    )
580                })?;
581            let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
582                InternalError::new(
583                    ErrorClass::InvariantViolation,
584                    ErrorOrigin::Query,
585                    "executor invariant violated: cursor row is not indexable for planned index-range access",
586                )
587            })?;
588
589            ContinuationToken::new_index_range_with_direction(
590                signature,
591                boundary,
592                IndexRangeCursorAnchor::new(index_key.to_raw()),
593                direction,
594            )
595        } else {
596            ContinuationToken::new_with_direction(signature, boundary, direction)
597        };
598        token.encode().map_err(|err| {
599            InternalError::new(
600                ErrorClass::Internal,
601                ErrorOrigin::Serialize,
602                format!("failed to encode continuation cursor: {err}"),
603            )
604        })
605    }
606}