Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6    db::{
7        Context, Db,
8        executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
9        executor::{OrderedKeyStream, OrderedKeyStreamBox},
10        index::{IndexKey, RawIndexKey},
11        query::plan::{
12            AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13            CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14            OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15            logical::PostAccessStats,
16            project_access_plan,
17            validate::{
18                PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19                validate_executor_plan,
20            },
21        },
22        response::Response,
23    },
24    error::{ErrorClass, ErrorOrigin, InternalError},
25    obs::sink::{ExecKind, Span},
26    traits::{EntityKind, EntityValue},
27    types::Id,
28    value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32///
33/// CursorPage
34///
35/// Internal load page result with continuation cursor payload.
36/// Returned by paged executor entrypoints.
37///
38
39#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41    pub(crate) items: Response<E>,
42
43    pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46///
47/// ExecutionAccessPathVariant
48///
49/// Coarse access path shape used by the load execution trace surface.
50///
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53    ByKey,
54    ByKeys,
55    KeyRange,
56    IndexPrefix,
57    IndexRange,
58    FullScan,
59    Union,
60    Intersection,
61}
62
63///
64/// ExecutionPushdownType
65///
66/// Pushdown optimization kind applied by load execution, if any.
67///
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70    SecondaryOrder,
71    IndexRangeLimit,
72}
73
74///
75/// ExecutionFastPath
76///
77/// Fast-path branch selected by load execution, if any.
78///
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81    PrimaryKey,
82    SecondaryIndex,
83    IndexRange,
84}
85
86///
87/// ExecutionTrace
88///
89/// Structured, opt-in load execution introspection snapshot.
90/// Captures plan-shape and execution decisions without changing semantics.
91///
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95    pub access_path_variant: ExecutionAccessPathVariant,
96    pub direction: OrderDirection,
97    pub pushdown_used: bool,
98    pub pushdown_type: Option<ExecutionPushdownType>,
99    pub fast_path_used: Option<ExecutionFastPath>,
100    pub keys_scanned: u64,
101    pub rows_returned: u64,
102    pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107        Self {
108            access_path_variant: access_path_variant(access),
109            direction: execution_order_direction(direction),
110            pushdown_used: false,
111            pushdown_type: None,
112            fast_path_used: None,
113            keys_scanned: 0,
114            rows_returned: 0,
115            continuation_applied,
116        }
117    }
118
119    fn set_path_outcome(
120        &mut self,
121        fast_path_used: Option<ExecutionFastPath>,
122        pushdown_type: Option<ExecutionPushdownType>,
123        keys_scanned: usize,
124        rows_returned: usize,
125    ) {
126        self.fast_path_used = fast_path_used;
127        self.pushdown_type = pushdown_type;
128        self.pushdown_used = pushdown_type.is_some();
129        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131    }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137    type Output = ExecutionAccessPathVariant;
138
139    fn by_key(&mut self, _key: &K) -> Self::Output {
140        ExecutionAccessPathVariant::ByKey
141    }
142
143    fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144        ExecutionAccessPathVariant::ByKeys
145    }
146
147    fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148        ExecutionAccessPathVariant::KeyRange
149    }
150
151    fn index_prefix(
152        &mut self,
153        _index_name: &'static str,
154        _index_fields: &[&'static str],
155        _prefix_len: usize,
156        _values: &[Value],
157    ) -> Self::Output {
158        ExecutionAccessPathVariant::IndexPrefix
159    }
160
161    fn index_range(
162        &mut self,
163        _index_name: &'static str,
164        _index_fields: &[&'static str],
165        _prefix_len: usize,
166        _prefix: &[Value],
167        _lower: &Bound<Value>,
168        _upper: &Bound<Value>,
169    ) -> Self::Output {
170        ExecutionAccessPathVariant::IndexRange
171    }
172
173    fn full_scan(&mut self) -> Self::Output {
174        ExecutionAccessPathVariant::FullScan
175    }
176
177    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178        ExecutionAccessPathVariant::Union
179    }
180
181    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182        ExecutionAccessPathVariant::Intersection
183    }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187    let mut projection = ExecutionAccessProjection;
188    project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192    match direction {
193        Direction::Asc => OrderDirection::Asc,
194        Direction::Desc => OrderDirection::Desc,
195    }
196}
197
198///
199/// FastPathKeyResult
200///
201/// Internal fast-path access result.
202/// Carries ordered keys plus observability metadata for shared execution phases.
203///
204struct FastPathKeyResult {
205    ordered_key_stream: OrderedKeyStreamBox,
206    rows_scanned: usize,
207    fast_path_used: ExecutionFastPath,
208    pushdown_type: Option<ExecutionPushdownType>,
209}
210
211///
212/// LoadExecutor
213///
214/// Load-plan executor with canonical post-access semantics.
215/// Coordinates fast paths, trace hooks, and pagination cursors.
216///
217
218#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220    db: Db<E::Canister>,
221    debug: bool,
222    _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227    E: EntityKind + EntityValue,
228{
229    #[must_use]
230    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231        Self {
232            db,
233            debug,
234            _marker: PhantomData,
235        }
236    }
237
238    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239        self.execute_paged_with_cursor(plan, PlannedCursor::none())
240            .map(|page| page.items)
241    }
242
243    pub(in crate::db) fn execute_paged_with_cursor(
244        &self,
245        plan: ExecutablePlan<E>,
246        cursor: impl Into<PlannedCursor>,
247    ) -> Result<CursorPage<E>, InternalError> {
248        self.execute_paged_with_cursor_traced(plan, cursor)
249            .map(|(page, _)| page)
250    }
251
252    pub(in crate::db) fn execute_paged_with_cursor_traced(
253        &self,
254        plan: ExecutablePlan<E>,
255        cursor: impl Into<PlannedCursor>,
256    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258        let cursor_boundary = cursor.boundary().cloned();
259        let index_range_anchor = cursor.index_range_anchor().cloned();
260
261        if !plan.mode().is_load() {
262            return Err(InternalError::query_invariant(
263                "executor invariant violated: load executor requires load plans",
264            ));
265        }
266
267        let direction = plan.direction();
268        let continuation_signature = plan.continuation_signature();
269        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
270        let mut execution_trace = self
271            .debug
272            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
273
274        let result = (|| {
275            let mut span = Span::<E>::new(ExecKind::Load);
276            let plan = plan.into_inner();
277
278            validate_executor_plan::<E>(&plan)?;
279            let ctx = self.db.recovered_context::<E>()?;
280
281            record_plan_metrics(&plan.access);
282            // Compute secondary ORDER BY pushdown eligibility once, then share the
283            // derived decision across trace and fast-path gating.
284            let secondary_pushdown_applicability =
285                Self::assess_secondary_order_pushdown_applicability(&plan);
286
287            if let Some(page) = Self::try_execute_fast_paths(
288                &ctx,
289                &plan,
290                &secondary_pushdown_applicability,
291                cursor_boundary.as_ref(),
292                index_range_anchor.as_ref(),
293                direction,
294                continuation_signature,
295                &mut span,
296                &mut execution_trace,
297            )? {
298                return Ok(page);
299            }
300
301            let mut key_stream = ctx.ordered_key_stream_from_access_plan_with_index_range_anchor(
302                &plan.access,
303                index_range_anchor.as_ref(),
304                direction,
305            )?;
306            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
307                &ctx,
308                &plan,
309                key_stream.as_mut(),
310                cursor_boundary.as_ref(),
311                direction,
312                continuation_signature,
313            )?;
314            Self::finalize_path_outcome(
315                &mut execution_trace,
316                None,
317                None,
318                keys_scanned,
319                post_access_rows,
320            );
321
322            set_rows_from_len(&mut span, page.items.0.len());
323            Ok(page)
324        })();
325
326        result.map(|page| (page, execution_trace))
327    }
328
329    // Record shared observability outcome for any execution path.
330    fn finalize_path_outcome(
331        execution_trace: &mut Option<ExecutionTrace>,
332        fast_path_used: Option<ExecutionFastPath>,
333        pushdown_type: Option<ExecutionPushdownType>,
334        rows_scanned: usize,
335        rows_returned: usize,
336    ) {
337        record_rows_scanned::<E>(rows_scanned);
338        if let Some(execution_trace) = execution_trace.as_mut() {
339            execution_trace.set_path_outcome(
340                fast_path_used,
341                pushdown_type,
342                rows_scanned,
343                rows_returned,
344            );
345            debug_assert_eq!(
346                execution_trace.keys_scanned,
347                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
348                "execution trace keys_scanned must match rows_scanned metrics input",
349            );
350        }
351    }
352
353    // Run the shared load phases for an already-produced ordered key stream.
354    fn materialize_key_stream_into_page(
355        ctx: &Context<'_, E>,
356        plan: &LogicalPlan<E::Key>,
357        key_stream: &mut dyn OrderedKeyStream,
358        cursor_boundary: Option<&CursorBoundary>,
359        direction: Direction,
360        continuation_signature: ContinuationSignature,
361    ) -> Result<(CursorPage<E>, usize, usize), InternalError> {
362        let data_rows = ctx.rows_from_ordered_key_stream(key_stream, plan.consistency)?;
363        let rows_scanned = data_rows.len();
364        let mut rows = Context::deserialize_rows(data_rows)?;
365        let page = Self::finalize_rows_into_page(
366            plan,
367            &mut rows,
368            cursor_boundary,
369            direction,
370            continuation_signature,
371        )?;
372        let post_access_rows = page.items.0.len();
373
374        Ok((page, rows_scanned, post_access_rows))
375    }
376
377    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
378    fn validate_pk_fast_path_boundary_if_applicable(
379        plan: &LogicalPlan<E::Key>,
380        cursor_boundary: Option<&CursorBoundary>,
381    ) -> Result<(), InternalError> {
382        if !Self::is_pk_order_stream_eligible(plan) {
383            return Ok(());
384        }
385        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
386
387        Ok(())
388    }
389
390    fn index_range_limit_pushdown_fetch(
391        plan: &LogicalPlan<E::Key>,
392        cursor_boundary: Option<&CursorBoundary>,
393        index_range_anchor: Option<&RawIndexKey>,
394    ) -> Option<usize> {
395        if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
396            return None;
397        }
398        if cursor_boundary.is_some() && index_range_anchor.is_none() {
399            return None;
400        }
401
402        let page = plan.page.as_ref()?;
403        let limit = page.limit?;
404        if limit == 0 {
405            return Some(0);
406        }
407
408        let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
409        let limit = usize::try_from(limit).unwrap_or(usize::MAX);
410        let page_end = offset.saturating_add(limit);
411        let needs_extra_row = true;
412
413        Some(page_end.saturating_add(usize::from(needs_extra_row)))
414    }
415
416    // Try each fast-path strategy in canonical order and return the first hit.
417    #[expect(
418        clippy::too_many_arguments,
419        reason = "fast-path dispatch keeps execution inputs explicit at one call site"
420    )]
421    fn try_execute_fast_paths(
422        ctx: &Context<'_, E>,
423        plan: &LogicalPlan<E::Key>,
424        secondary_pushdown_applicability: &PushdownApplicability,
425        cursor_boundary: Option<&CursorBoundary>,
426        index_range_anchor: Option<&RawIndexKey>,
427        direction: Direction,
428        continuation_signature: ContinuationSignature,
429        span: &mut Span<E>,
430        execution_trace: &mut Option<ExecutionTrace>,
431    ) -> Result<Option<CursorPage<E>>, InternalError> {
432        Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
433
434        if let Some(mut fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
435            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
436                ctx,
437                plan,
438                fast.ordered_key_stream.as_mut(),
439                cursor_boundary,
440                direction,
441                continuation_signature,
442            )?;
443            Self::finalize_path_outcome(
444                execution_trace,
445                Some(fast.fast_path_used),
446                fast.pushdown_type,
447                fast.rows_scanned,
448                post_access_rows,
449            );
450            set_rows_from_len(span, page.items.0.len());
451            return Ok(Some(page));
452        }
453
454        if let Some(mut fast) = Self::try_execute_secondary_index_order_stream(
455            ctx,
456            plan,
457            secondary_pushdown_applicability,
458        )? {
459            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
460                ctx,
461                plan,
462                fast.ordered_key_stream.as_mut(),
463                cursor_boundary,
464                direction,
465                continuation_signature,
466            )?;
467            Self::finalize_path_outcome(
468                execution_trace,
469                Some(fast.fast_path_used),
470                fast.pushdown_type,
471                fast.rows_scanned,
472                post_access_rows,
473            );
474            set_rows_from_len(span, page.items.0.len());
475            return Ok(Some(page));
476        }
477
478        let index_range_limit_fetch =
479            Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
480        if let Some(mut fast) = Self::try_execute_index_range_limit_pushdown_stream(
481            ctx,
482            plan,
483            index_range_anchor,
484            direction,
485            index_range_limit_fetch,
486        )? {
487            let (page, _, post_access_rows) = Self::materialize_key_stream_into_page(
488                ctx,
489                plan,
490                fast.ordered_key_stream.as_mut(),
491                cursor_boundary,
492                direction,
493                continuation_signature,
494            )?;
495            Self::finalize_path_outcome(
496                execution_trace,
497                Some(fast.fast_path_used),
498                fast.pushdown_type,
499                fast.rows_scanned,
500                post_access_rows,
501            );
502            set_rows_from_len(span, page.items.0.len());
503            return Ok(Some(page));
504        }
505
506        Ok(None)
507    }
508
509    // Apply canonical post-access phases to scanned rows and assemble the cursor page.
510    fn finalize_rows_into_page(
511        plan: &LogicalPlan<E::Key>,
512        rows: &mut Vec<(Id<E>, E)>,
513        cursor_boundary: Option<&CursorBoundary>,
514        direction: Direction,
515        continuation_signature: ContinuationSignature,
516    ) -> Result<CursorPage<E>, InternalError> {
517        let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
518        let next_cursor =
519            Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
520        let items = Response(std::mem::take(rows));
521
522        Ok(CursorPage { items, next_cursor })
523    }
524
525    // Assess secondary-index ORDER BY pushdown once for this execution and
526    // map matrix outcomes to executor decisions.
527    fn assess_secondary_order_pushdown_applicability(
528        plan: &LogicalPlan<E::Key>,
529    ) -> PushdownApplicability {
530        assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
531    }
532
533    fn build_next_cursor(
534        plan: &LogicalPlan<E::Key>,
535        rows: &[(Id<E>, E)],
536        stats: &PostAccessStats,
537        direction: Direction,
538        signature: ContinuationSignature,
539    ) -> Result<Option<Vec<u8>>, InternalError> {
540        let Some(page) = plan.page.as_ref() else {
541            return Ok(None);
542        };
543        let Some(limit) = page.limit else {
544            return Ok(None);
545        };
546        if rows.is_empty() {
547            return Ok(None);
548        }
549
550        // NOTE: post-access execution materializes full in-memory rows for Phase 1.
551        let page_end = (page.offset as usize).saturating_add(limit as usize);
552        if stats.rows_after_cursor <= page_end {
553            return Ok(None);
554        }
555
556        let Some((_, last_entity)) = rows.last() else {
557            return Ok(None);
558        };
559        Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
560    }
561
562    // Encode the continuation token from the last returned entity.
563    fn encode_next_cursor_for_last_entity(
564        plan: &LogicalPlan<E::Key>,
565        last_entity: &E,
566        direction: Direction,
567        signature: ContinuationSignature,
568    ) -> Result<Vec<u8>, InternalError> {
569        let boundary = plan.cursor_boundary_from_entity(last_entity)?;
570        let token = if plan.access.cursor_support().supports_index_range_anchor() {
571            let (index, _, _, _) =
572                plan.access.as_index_range_path().ok_or_else(|| {
573                    InternalError::query_invariant(
574                        "executor invariant violated: index-range cursor support missing concrete index-range path",
575                    )
576                })?;
577            let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
578                InternalError::query_invariant(
579                    "executor invariant violated: cursor row is not indexable for planned index-range access",
580                )
581            })?;
582
583            ContinuationToken::new_index_range_with_direction(
584                signature,
585                boundary,
586                IndexRangeCursorAnchor::new(index_key.to_raw()),
587                direction,
588            )
589        } else {
590            ContinuationToken::new_with_direction(signature, boundary, direction)
591        };
592        token.encode().map_err(|err| {
593            InternalError::new(
594                ErrorClass::Internal,
595                ErrorOrigin::Serialize,
596                format!("failed to encode continuation cursor: {err}"),
597            )
598        })
599    }
600}