Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod index_range_limit;
2mod pk_stream;
3mod secondary_index;
4
5use crate::{
6    db::{
7        Context, Db,
8        data::DataKey,
9        executor::plan::{record_plan_metrics, record_rows_scanned, set_rows_from_len},
10        index::{IndexKey, RawIndexKey},
11        query::plan::{
12            AccessPlan, AccessPlanProjection, ContinuationSignature, ContinuationToken,
13            CursorBoundary, Direction, ExecutablePlan, IndexRangeCursorAnchor, LogicalPlan,
14            OrderDirection, PlannedCursor, decode_pk_cursor_boundary,
15            logical::PostAccessStats,
16            project_access_plan,
17            validate::{
18                PushdownApplicability, assess_secondary_order_pushdown_if_applicable_validated,
19                validate_executor_plan,
20            },
21        },
22        response::Response,
23    },
24    error::{ErrorClass, ErrorOrigin, InternalError},
25    obs::sink::{ExecKind, Span},
26    traits::{EntityKind, EntityValue},
27    types::Id,
28    value::Value,
29};
30use std::{marker::PhantomData, ops::Bound};
31
32///
33/// CursorPage
34///
35/// Internal load page result with continuation cursor payload.
36/// Returned by paged executor entrypoints.
37///
38
39#[derive(Debug)]
40pub(crate) struct CursorPage<E: EntityKind> {
41    pub(crate) items: Response<E>,
42
43    pub(crate) next_cursor: Option<Vec<u8>>,
44}
45
46///
47/// ExecutionAccessPathVariant
48///
49/// Coarse access path shape used by the load execution trace surface.
50///
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52pub enum ExecutionAccessPathVariant {
53    ByKey,
54    ByKeys,
55    KeyRange,
56    IndexPrefix,
57    IndexRange,
58    FullScan,
59    Union,
60    Intersection,
61}
62
63///
64/// ExecutionPushdownType
65///
66/// Pushdown optimization kind applied by load execution, if any.
67///
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionPushdownType {
70    SecondaryOrder,
71    IndexRangeLimit,
72}
73
74///
75/// ExecutionFastPath
76///
77/// Fast-path branch selected by load execution, if any.
78///
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionFastPath {
81    PrimaryKey,
82    SecondaryIndex,
83    IndexRange,
84}
85
86///
87/// ExecutionTrace
88///
89/// Structured, opt-in load execution introspection snapshot.
90/// Captures plan-shape and execution decisions without changing semantics.
91///
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95    pub access_path_variant: ExecutionAccessPathVariant,
96    pub direction: OrderDirection,
97    pub pushdown_used: bool,
98    pub pushdown_type: Option<ExecutionPushdownType>,
99    pub fast_path_used: Option<ExecutionFastPath>,
100    pub keys_scanned: u64,
101    pub rows_returned: u64,
102    pub continuation_applied: bool,
103}
104
105impl ExecutionTrace {
106    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107        Self {
108            access_path_variant: access_path_variant(access),
109            direction: execution_order_direction(direction),
110            pushdown_used: false,
111            pushdown_type: None,
112            fast_path_used: None,
113            keys_scanned: 0,
114            rows_returned: 0,
115            continuation_applied,
116        }
117    }
118
119    fn set_path_outcome(
120        &mut self,
121        fast_path_used: Option<ExecutionFastPath>,
122        pushdown_type: Option<ExecutionPushdownType>,
123        keys_scanned: usize,
124        rows_returned: usize,
125    ) {
126        self.fast_path_used = fast_path_used;
127        self.pushdown_type = pushdown_type;
128        self.pushdown_used = pushdown_type.is_some();
129        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
130        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
131    }
132}
133
134struct ExecutionAccessProjection;
135
136impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
137    type Output = ExecutionAccessPathVariant;
138
139    fn by_key(&mut self, _key: &K) -> Self::Output {
140        ExecutionAccessPathVariant::ByKey
141    }
142
143    fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
144        ExecutionAccessPathVariant::ByKeys
145    }
146
147    fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
148        ExecutionAccessPathVariant::KeyRange
149    }
150
151    fn index_prefix(
152        &mut self,
153        _index_name: &'static str,
154        _index_fields: &[&'static str],
155        _prefix_len: usize,
156        _values: &[Value],
157    ) -> Self::Output {
158        ExecutionAccessPathVariant::IndexPrefix
159    }
160
161    fn index_range(
162        &mut self,
163        _index_name: &'static str,
164        _index_fields: &[&'static str],
165        _prefix_len: usize,
166        _prefix: &[Value],
167        _lower: &Bound<Value>,
168        _upper: &Bound<Value>,
169    ) -> Self::Output {
170        ExecutionAccessPathVariant::IndexRange
171    }
172
173    fn full_scan(&mut self) -> Self::Output {
174        ExecutionAccessPathVariant::FullScan
175    }
176
177    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
178        ExecutionAccessPathVariant::Union
179    }
180
181    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
182        ExecutionAccessPathVariant::Intersection
183    }
184}
185
186fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
187    let mut projection = ExecutionAccessProjection;
188    project_access_plan(access, &mut projection)
189}
190
191const fn execution_order_direction(direction: Direction) -> OrderDirection {
192    match direction {
193        Direction::Asc => OrderDirection::Asc,
194        Direction::Desc => OrderDirection::Desc,
195    }
196}
197
198///
199/// FastPathKeyResult
200///
201/// Internal fast-path access result.
202/// Carries ordered keys plus observability metadata for shared execution phases.
203///
204struct FastPathKeyResult {
205    ordered_keys: Vec<DataKey>,
206    rows_scanned: usize,
207    fast_path_used: ExecutionFastPath,
208    pushdown_type: Option<ExecutionPushdownType>,
209}
210
211///
212/// LoadExecutor
213///
214/// Load-plan executor with canonical post-access semantics.
215/// Coordinates fast paths, trace hooks, and pagination cursors.
216///
217
218#[derive(Clone)]
219pub(crate) struct LoadExecutor<E: EntityKind> {
220    db: Db<E::Canister>,
221    debug: bool,
222    _marker: PhantomData<E>,
223}
224
225impl<E> LoadExecutor<E>
226where
227    E: EntityKind + EntityValue,
228{
229    #[must_use]
230    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
231        Self {
232            db,
233            debug,
234            _marker: PhantomData,
235        }
236    }
237
238    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
239        self.execute_paged_with_cursor(plan, PlannedCursor::none())
240            .map(|page| page.items)
241    }
242
243    pub(in crate::db) fn execute_paged_with_cursor(
244        &self,
245        plan: ExecutablePlan<E>,
246        cursor: impl Into<PlannedCursor>,
247    ) -> Result<CursorPage<E>, InternalError> {
248        self.execute_paged_with_cursor_traced(plan, cursor)
249            .map(|(page, _)| page)
250    }
251
252    pub(in crate::db) fn execute_paged_with_cursor_traced(
253        &self,
254        plan: ExecutablePlan<E>,
255        cursor: impl Into<PlannedCursor>,
256    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
257        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
258        let cursor_boundary = cursor.boundary().cloned();
259        let index_range_anchor = cursor.index_range_anchor().cloned();
260
261        if !plan.mode().is_load() {
262            return Err(InternalError::new(
263                ErrorClass::InvariantViolation,
264                ErrorOrigin::Query,
265                "executor invariant violated: load executor requires load plans",
266            ));
267        }
268
269        let direction = plan.direction();
270        let continuation_signature = plan.continuation_signature();
271        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
272        let mut execution_trace = self
273            .debug
274            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
275
276        let result = (|| {
277            let mut span = Span::<E>::new(ExecKind::Load);
278            let plan = plan.into_inner();
279
280            validate_executor_plan::<E>(&plan)?;
281            let ctx = self.db.recovered_context::<E>()?;
282
283            record_plan_metrics(&plan.access);
284            // Compute secondary ORDER BY pushdown eligibility once, then share the
285            // derived decision across trace and fast-path gating.
286            let secondary_pushdown_applicability =
287                Self::assess_secondary_order_pushdown_applicability(&plan);
288
289            if let Some(page) = Self::try_execute_fast_paths(
290                &ctx,
291                &plan,
292                &secondary_pushdown_applicability,
293                cursor_boundary.as_ref(),
294                index_range_anchor.as_ref(),
295                direction,
296                continuation_signature,
297                &mut span,
298                &mut execution_trace,
299            )? {
300                return Ok(page);
301            }
302
303            let data_rows = ctx.rows_from_access_plan_with_index_range_anchor(
304                &plan.access,
305                plan.consistency,
306                index_range_anchor.as_ref(),
307                direction,
308            )?;
309            let keys_scanned = data_rows.len();
310
311            let mut rows = Context::deserialize_rows(data_rows)?;
312            let page = Self::finalize_rows_into_page(
313                &plan,
314                &mut rows,
315                cursor_boundary.as_ref(),
316                direction,
317                continuation_signature,
318            )?;
319            let post_access_rows = page.items.0.len();
320            Self::finalize_path_outcome(
321                &mut execution_trace,
322                None,
323                None,
324                keys_scanned,
325                post_access_rows,
326            );
327
328            set_rows_from_len(&mut span, page.items.0.len());
329            Ok(page)
330        })();
331
332        result.map(|page| (page, execution_trace))
333    }
334
335    // Record shared observability outcome for any execution path.
336    fn finalize_path_outcome(
337        execution_trace: &mut Option<ExecutionTrace>,
338        fast_path_used: Option<ExecutionFastPath>,
339        pushdown_type: Option<ExecutionPushdownType>,
340        rows_scanned: usize,
341        rows_returned: usize,
342    ) {
343        record_rows_scanned::<E>(rows_scanned);
344        if let Some(execution_trace) = execution_trace.as_mut() {
345            execution_trace.set_path_outcome(
346                fast_path_used,
347                pushdown_type,
348                rows_scanned,
349                rows_returned,
350            );
351            debug_assert_eq!(
352                execution_trace.keys_scanned,
353                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
354                "execution trace keys_scanned must match rows_scanned metrics input",
355            );
356        }
357    }
358
359    // Run the shared load phases for an already-produced ordered key list.
360    fn materialize_keys_into_page(
361        ctx: &Context<'_, E>,
362        plan: &LogicalPlan<E::Key>,
363        ordered_keys: &[DataKey],
364        cursor_boundary: Option<&CursorBoundary>,
365        direction: Direction,
366        continuation_signature: ContinuationSignature,
367    ) -> Result<(CursorPage<E>, usize), InternalError> {
368        let data_rows = ctx.rows_from_ordered_data_keys(ordered_keys, plan.consistency)?;
369        let mut rows = Context::deserialize_rows(data_rows)?;
370        let page = Self::finalize_rows_into_page(
371            plan,
372            &mut rows,
373            cursor_boundary,
374            direction,
375            continuation_signature,
376        )?;
377        let post_access_rows = page.items.0.len();
378
379        Ok((page, post_access_rows))
380    }
381
382    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
383    fn validate_pk_fast_path_boundary_if_applicable(
384        plan: &LogicalPlan<E::Key>,
385        cursor_boundary: Option<&CursorBoundary>,
386    ) -> Result<(), InternalError> {
387        if !Self::is_pk_order_stream_eligible(plan) {
388            return Ok(());
389        }
390        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
391
392        Ok(())
393    }
394
395    fn index_range_limit_pushdown_fetch(
396        plan: &LogicalPlan<E::Key>,
397        cursor_boundary: Option<&CursorBoundary>,
398        index_range_anchor: Option<&RawIndexKey>,
399    ) -> Option<usize> {
400        if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
401            return None;
402        }
403        if cursor_boundary.is_some() && index_range_anchor.is_none() {
404            return None;
405        }
406
407        let page = plan.page.as_ref()?;
408        let limit = page.limit?;
409        if limit == 0 {
410            return Some(0);
411        }
412
413        let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
414        let limit = usize::try_from(limit).unwrap_or(usize::MAX);
415        let page_end = offset.saturating_add(limit);
416        let needs_extra_row = true;
417
418        Some(page_end.saturating_add(usize::from(needs_extra_row)))
419    }
420
421    // Try each fast-path strategy in canonical order and return the first hit.
422    #[expect(
423        clippy::too_many_arguments,
424        reason = "fast-path dispatch keeps execution inputs explicit at one call site"
425    )]
426    fn try_execute_fast_paths(
427        ctx: &Context<'_, E>,
428        plan: &LogicalPlan<E::Key>,
429        secondary_pushdown_applicability: &PushdownApplicability,
430        cursor_boundary: Option<&CursorBoundary>,
431        index_range_anchor: Option<&RawIndexKey>,
432        direction: Direction,
433        continuation_signature: ContinuationSignature,
434        span: &mut Span<E>,
435        execution_trace: &mut Option<ExecutionTrace>,
436    ) -> Result<Option<CursorPage<E>>, InternalError> {
437        Self::validate_pk_fast_path_boundary_if_applicable(plan, cursor_boundary)?;
438
439        if let Some(fast) = Self::try_execute_pk_order_stream(ctx, plan)? {
440            let (page, post_access_rows) = Self::materialize_keys_into_page(
441                ctx,
442                plan,
443                &fast.ordered_keys,
444                cursor_boundary,
445                direction,
446                continuation_signature,
447            )?;
448            Self::finalize_path_outcome(
449                execution_trace,
450                Some(fast.fast_path_used),
451                fast.pushdown_type,
452                fast.rows_scanned,
453                post_access_rows,
454            );
455            set_rows_from_len(span, page.items.0.len());
456            return Ok(Some(page));
457        }
458
459        if let Some(fast) = Self::try_execute_secondary_index_order_stream(
460            ctx,
461            plan,
462            secondary_pushdown_applicability,
463        )? {
464            let (page, post_access_rows) = Self::materialize_keys_into_page(
465                ctx,
466                plan,
467                &fast.ordered_keys,
468                cursor_boundary,
469                direction,
470                continuation_signature,
471            )?;
472            Self::finalize_path_outcome(
473                execution_trace,
474                Some(fast.fast_path_used),
475                fast.pushdown_type,
476                fast.rows_scanned,
477                post_access_rows,
478            );
479            set_rows_from_len(span, page.items.0.len());
480            return Ok(Some(page));
481        }
482
483        let index_range_limit_fetch =
484            Self::index_range_limit_pushdown_fetch(plan, cursor_boundary, index_range_anchor);
485        if let Some(fast) = Self::try_execute_index_range_limit_pushdown_stream(
486            ctx,
487            plan,
488            index_range_anchor,
489            direction,
490            index_range_limit_fetch,
491        )? {
492            let (page, post_access_rows) = Self::materialize_keys_into_page(
493                ctx,
494                plan,
495                &fast.ordered_keys,
496                cursor_boundary,
497                direction,
498                continuation_signature,
499            )?;
500            Self::finalize_path_outcome(
501                execution_trace,
502                Some(fast.fast_path_used),
503                fast.pushdown_type,
504                fast.rows_scanned,
505                post_access_rows,
506            );
507            set_rows_from_len(span, page.items.0.len());
508            return Ok(Some(page));
509        }
510
511        Ok(None)
512    }
513
514    // Apply canonical post-access phases to scanned rows and assemble the cursor page.
515    fn finalize_rows_into_page(
516        plan: &LogicalPlan<E::Key>,
517        rows: &mut Vec<(Id<E>, E)>,
518        cursor_boundary: Option<&CursorBoundary>,
519        direction: Direction,
520        continuation_signature: ContinuationSignature,
521    ) -> Result<CursorPage<E>, InternalError> {
522        let stats = plan.apply_post_access_with_cursor::<E, _>(rows, cursor_boundary)?;
523        let next_cursor =
524            Self::build_next_cursor(plan, rows, &stats, direction, continuation_signature)?;
525        let items = Response(std::mem::take(rows));
526
527        Ok(CursorPage { items, next_cursor })
528    }
529
530    // Assess secondary-index ORDER BY pushdown once for this execution and
531    // map matrix outcomes to executor decisions.
532    fn assess_secondary_order_pushdown_applicability(
533        plan: &LogicalPlan<E::Key>,
534    ) -> PushdownApplicability {
535        assess_secondary_order_pushdown_if_applicable_validated(E::MODEL, plan)
536    }
537
538    fn build_next_cursor(
539        plan: &LogicalPlan<E::Key>,
540        rows: &[(Id<E>, E)],
541        stats: &PostAccessStats,
542        direction: Direction,
543        signature: ContinuationSignature,
544    ) -> Result<Option<Vec<u8>>, InternalError> {
545        let Some(page) = plan.page.as_ref() else {
546            return Ok(None);
547        };
548        let Some(limit) = page.limit else {
549            return Ok(None);
550        };
551        if rows.is_empty() {
552            return Ok(None);
553        }
554
555        // NOTE: post-access execution materializes full in-memory rows for Phase 1.
556        let page_end = (page.offset as usize).saturating_add(limit as usize);
557        if stats.rows_after_cursor <= page_end {
558            return Ok(None);
559        }
560
561        let Some((_, last_entity)) = rows.last() else {
562            return Ok(None);
563        };
564        Self::encode_next_cursor_for_last_entity(plan, last_entity, direction, signature).map(Some)
565    }
566
567    // Encode the continuation token from the last returned entity.
568    fn encode_next_cursor_for_last_entity(
569        plan: &LogicalPlan<E::Key>,
570        last_entity: &E,
571        direction: Direction,
572        signature: ContinuationSignature,
573    ) -> Result<Vec<u8>, InternalError> {
574        let boundary = plan.cursor_boundary_from_entity(last_entity)?;
575        let token = if plan.access.cursor_support().supports_index_range_anchor() {
576            let (index, _, _, _) =
577                plan.access.as_index_range_path().ok_or_else(|| {
578                    InternalError::new(
579                        ErrorClass::InvariantViolation,
580                        ErrorOrigin::Query,
581                        "executor invariant violated: index-range cursor support missing concrete index-range path",
582                    )
583                })?;
584            let index_key = IndexKey::new(last_entity, index)?.ok_or_else(|| {
585                InternalError::new(
586                    ErrorClass::InvariantViolation,
587                    ErrorOrigin::Query,
588                    "executor invariant violated: cursor row is not indexable for planned index-range access",
589                )
590            })?;
591
592            ContinuationToken::new_index_range_with_direction(
593                signature,
594                boundary,
595                IndexRangeCursorAnchor::new(index_key.to_raw()),
596                direction,
597            )
598        } else {
599            ContinuationToken::new_with_direction(signature, boundary, direction)
600        };
601        token.encode().map_err(|err| {
602            InternalError::new(
603                ErrorClass::Internal,
604                ErrorOrigin::Serialize,
605                format!("failed to encode continuation cursor: {err}"),
606            )
607        })
608    }
609}