Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod execute;
2mod index_range_limit;
3mod page;
4mod pk_stream;
5mod route;
6mod secondary_index;
7
8use crate::{
9    db::{
10        Db,
11        executor::KeyOrderComparator,
12        executor::OrderedKeyStreamBox,
13        executor::plan::{record_plan_metrics, record_rows_scanned},
14        index::RawIndexKey,
15        query::plan::{
16            AccessPlan, AccessPlanProjection, CursorBoundary, Direction, ExecutablePlan,
17            LogicalPlan, OrderDirection, PlannedCursor, SlotSelectionPolicy, compute_page_window,
18            decode_pk_cursor_boundary, derive_scan_direction, project_access_plan,
19            validate::validate_executor_plan,
20        },
21        response::Response,
22    },
23    error::InternalError,
24    obs::sink::{ExecKind, Span},
25    traits::{EntityKind, EntityValue},
26    value::Value,
27};
28use std::{marker::PhantomData, ops::Bound};
29
30///
31/// CursorPage
32///
33/// Internal load page result with continuation cursor payload.
34/// Returned by paged executor entrypoints.
35///
36
37#[derive(Debug)]
38pub(crate) struct CursorPage<E: EntityKind> {
39    pub(crate) items: Response<E>,
40
41    pub(crate) next_cursor: Option<Vec<u8>>,
42}
43
44///
45/// ExecutionAccessPathVariant
46///
47/// Coarse access path shape used by the load execution trace surface.
48///
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
50pub enum ExecutionAccessPathVariant {
51    ByKey,
52    ByKeys,
53    KeyRange,
54    IndexPrefix,
55    IndexRange,
56    FullScan,
57    Union,
58    Intersection,
59}
60
61///
62/// ExecutionOptimization
63///
64/// Canonical load optimization selected by execution, if any.
65///
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum ExecutionOptimization {
69    PrimaryKey,
70    SecondaryOrderPushdown,
71    IndexRangeLimitPushdown,
72}
73
74///
75/// ExecutionTrace
76///
77/// Structured, opt-in load execution introspection snapshot.
78/// Captures plan-shape and execution decisions without changing semantics.
79///
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq)]
82pub struct ExecutionTrace {
83    pub access_path_variant: ExecutionAccessPathVariant,
84    pub direction: OrderDirection,
85    pub optimization: Option<ExecutionOptimization>,
86    pub keys_scanned: u64,
87    pub rows_returned: u64,
88    pub continuation_applied: bool,
89}
90
91impl ExecutionTrace {
92    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
93        Self {
94            access_path_variant: access_path_variant(access),
95            direction: execution_order_direction(direction),
96            optimization: None,
97            keys_scanned: 0,
98            rows_returned: 0,
99            continuation_applied,
100        }
101    }
102
103    fn set_path_outcome(
104        &mut self,
105        optimization: Option<ExecutionOptimization>,
106        keys_scanned: usize,
107        rows_returned: usize,
108    ) {
109        self.optimization = optimization;
110        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
111        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
112    }
113}
114
115struct ExecutionAccessProjection;
116
117impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
118    type Output = ExecutionAccessPathVariant;
119
120    fn by_key(&mut self, _key: &K) -> Self::Output {
121        ExecutionAccessPathVariant::ByKey
122    }
123
124    fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
125        ExecutionAccessPathVariant::ByKeys
126    }
127
128    fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
129        ExecutionAccessPathVariant::KeyRange
130    }
131
132    fn index_prefix(
133        &mut self,
134        _index_name: &'static str,
135        _index_fields: &[&'static str],
136        _prefix_len: usize,
137        _values: &[Value],
138    ) -> Self::Output {
139        ExecutionAccessPathVariant::IndexPrefix
140    }
141
142    fn index_range(
143        &mut self,
144        _index_name: &'static str,
145        _index_fields: &[&'static str],
146        _prefix_len: usize,
147        _prefix: &[Value],
148        _lower: &Bound<Value>,
149        _upper: &Bound<Value>,
150    ) -> Self::Output {
151        ExecutionAccessPathVariant::IndexRange
152    }
153
154    fn full_scan(&mut self) -> Self::Output {
155        ExecutionAccessPathVariant::FullScan
156    }
157
158    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
159        ExecutionAccessPathVariant::Union
160    }
161
162    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
163        ExecutionAccessPathVariant::Intersection
164    }
165}
166
167fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
168    let mut projection = ExecutionAccessProjection;
169    project_access_plan(access, &mut projection)
170}
171
172const fn execution_order_direction(direction: Direction) -> OrderDirection {
173    match direction {
174        Direction::Asc => OrderDirection::Asc,
175        Direction::Desc => OrderDirection::Desc,
176    }
177}
178
179fn key_stream_comparator_from_plan<K>(
180    plan: &LogicalPlan<K>,
181    fallback_direction: Direction,
182) -> KeyOrderComparator {
183    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
184        derive_scan_direction(order, SlotSelectionPolicy::Last)
185    });
186
187    // Comparator and child-stream monotonicity must stay aligned until access-path
188    // stream production can emit keys under an order-spec-derived comparator.
189    let comparator_direction = if derived_direction == fallback_direction {
190        derived_direction
191    } else {
192        fallback_direction
193    };
194
195    KeyOrderComparator::from_direction(comparator_direction)
196}
197
198///
199/// FastPathKeyResult
200///
201/// Internal fast-path access result.
202/// Carries ordered keys plus observability metadata for shared execution phases.
203///
204struct FastPathKeyResult {
205    ordered_key_stream: OrderedKeyStreamBox,
206    rows_scanned: usize,
207    optimization: ExecutionOptimization,
208}
209
210///
211/// IndexRangeLimitSpec
212///
213/// Canonical executor decision payload for index-range limit pushdown.
214/// Encodes the bounded fetch size after all eligibility gates pass.
215///
216
217struct IndexRangeLimitSpec {
218    fetch: usize,
219}
220
221///
222/// LoadExecutor
223///
224/// Load-plan executor with canonical post-access semantics.
225/// Coordinates fast paths, trace hooks, and pagination cursors.
226///
227
228#[derive(Clone)]
229pub(crate) struct LoadExecutor<E: EntityKind> {
230    db: Db<E::Canister>,
231    debug: bool,
232    _marker: PhantomData<E>,
233}
234
235impl<E> LoadExecutor<E>
236where
237    E: EntityKind + EntityValue,
238{
239    #[must_use]
240    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
241        Self {
242            db,
243            debug,
244            _marker: PhantomData,
245        }
246    }
247
248    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
249        self.execute_paged_with_cursor(plan, PlannedCursor::none())
250            .map(|page| page.items)
251    }
252
253    pub(in crate::db) fn execute_paged_with_cursor(
254        &self,
255        plan: ExecutablePlan<E>,
256        cursor: impl Into<PlannedCursor>,
257    ) -> Result<CursorPage<E>, InternalError> {
258        self.execute_paged_with_cursor_traced(plan, cursor)
259            .map(|(page, _)| page)
260    }
261
262    pub(in crate::db) fn execute_paged_with_cursor_traced(
263        &self,
264        plan: ExecutablePlan<E>,
265        cursor: impl Into<PlannedCursor>,
266    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
267        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
268        let cursor_boundary = cursor.boundary().cloned();
269        let index_range_anchor = cursor.index_range_anchor().cloned();
270
271        if !plan.mode().is_load() {
272            return Err(InternalError::query_invariant(
273                "executor invariant violated: load executor requires load plans",
274            ));
275        }
276
277        let direction = plan.direction();
278        let continuation_signature = plan.continuation_signature();
279        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
280        let mut execution_trace = self
281            .debug
282            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
283
284        let result = (|| {
285            let mut span = Span::<E>::new(ExecKind::Load);
286            let plan = plan.into_inner();
287
288            validate_executor_plan::<E>(&plan)?;
289            let ctx = self.db.recovered_context::<E>()?;
290
291            record_plan_metrics(&plan.access);
292            // Plan fast-path routing decisions once, then execute in canonical order.
293            let fast_path_plan = Self::build_fast_path_plan(
294                &plan,
295                cursor_boundary.as_ref(),
296                index_range_anchor.as_ref(),
297            )?;
298
299            if let Some(page) = Self::try_execute_fast_path_plan(
300                &ctx,
301                &plan,
302                &fast_path_plan,
303                cursor_boundary.as_ref(),
304                index_range_anchor.as_ref(),
305                direction,
306                continuation_signature,
307                &mut span,
308                &mut execution_trace,
309            )? {
310                return Ok(page);
311            }
312
313            let page = Self::execute_fallback_path(
314                &ctx,
315                &plan,
316                cursor_boundary.as_ref(),
317                index_range_anchor.as_ref(),
318                direction,
319                continuation_signature,
320                &mut span,
321                &mut execution_trace,
322            )?;
323            Ok(page)
324        })();
325
326        result.map(|page| (page, execution_trace))
327    }
328
329    // Record shared observability outcome for any execution path.
330    fn finalize_path_outcome(
331        execution_trace: &mut Option<ExecutionTrace>,
332        optimization: Option<ExecutionOptimization>,
333        rows_scanned: usize,
334        rows_returned: usize,
335    ) {
336        record_rows_scanned::<E>(rows_scanned);
337        if let Some(execution_trace) = execution_trace.as_mut() {
338            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
339            debug_assert_eq!(
340                execution_trace.keys_scanned,
341                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
342                "execution trace keys_scanned must match rows_scanned metrics input",
343            );
344        }
345    }
346
347    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
348    fn validate_pk_fast_path_boundary_if_applicable(
349        plan: &LogicalPlan<E::Key>,
350        cursor_boundary: Option<&CursorBoundary>,
351    ) -> Result<(), InternalError> {
352        if !Self::is_pk_order_stream_eligible(plan) {
353            return Ok(());
354        }
355        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
356
357        Ok(())
358    }
359
360    fn assess_index_range_limit_pushdown(
361        plan: &LogicalPlan<E::Key>,
362        cursor_boundary: Option<&CursorBoundary>,
363        index_range_anchor: Option<&RawIndexKey>,
364    ) -> Option<IndexRangeLimitSpec> {
365        if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
366            return None;
367        }
368        if cursor_boundary.is_some() && index_range_anchor.is_none() {
369            return None;
370        }
371
372        let page = plan.page.as_ref()?;
373        let limit = page.limit?;
374        if limit == 0 {
375            return Some(IndexRangeLimitSpec { fetch: 0 });
376        }
377
378        let fetch = compute_page_window(page.offset, limit, true).fetch_count;
379
380        Some(IndexRangeLimitSpec { fetch })
381    }
382}