Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod fast_stream;
6mod index_range_limit;
7mod page;
8mod pk_stream;
9mod route;
10mod secondary_index;
11mod trace;
12
13use self::{
14    execute::{ExecutionInputs, IndexPredicateCompileMode},
15    route::ExecutionRoutePlan,
16    trace::{access_path_variant, execution_order_direction},
17};
18use crate::{
19    db::{
20        Db,
21        executor::{
22            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
23            plan::{record_plan_metrics, record_rows_scanned},
24        },
25        query::plan::{
26            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
27            PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
28            derive_scan_direction, validate::validate_executor_plan,
29        },
30        response::Response,
31    },
32    error::InternalError,
33    obs::sink::{ExecKind, Span},
34    traits::{EntityKind, EntityValue},
35};
36use std::marker::PhantomData;
37
38///
39/// CursorPage
40///
41/// Internal load page result with continuation cursor payload.
42/// Returned by paged executor entrypoints.
43///
44
45#[derive(Debug)]
46pub(crate) struct CursorPage<E: EntityKind> {
47    pub(crate) items: Response<E>,
48
49    pub(crate) next_cursor: Option<Vec<u8>>,
50}
51
52///
53/// ExecutionAccessPathVariant
54///
55/// Coarse access path shape used by the load execution trace surface.
56///
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub enum ExecutionAccessPathVariant {
59    ByKey,
60    ByKeys,
61    KeyRange,
62    IndexPrefix,
63    IndexRange,
64    FullScan,
65    Union,
66    Intersection,
67}
68
69///
70/// ExecutionOptimization
71///
72/// Canonical load optimization selected by execution, if any.
73///
74
75#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum ExecutionOptimization {
77    PrimaryKey,
78    SecondaryOrderPushdown,
79    IndexRangeLimitPushdown,
80}
81
82///
83/// ExecutionTrace
84///
85/// Structured, opt-in load execution introspection snapshot.
86/// Captures plan-shape and execution decisions without changing semantics.
87///
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub struct ExecutionTrace {
91    pub access_path_variant: ExecutionAccessPathVariant,
92    pub direction: OrderDirection,
93    pub optimization: Option<ExecutionOptimization>,
94    pub keys_scanned: u64,
95    pub rows_returned: u64,
96    pub continuation_applied: bool,
97    pub index_predicate_applied: bool,
98    pub index_predicate_keys_rejected: u64,
99    pub distinct_keys_deduped: u64,
100}
101
102impl ExecutionTrace {
103    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
104        Self {
105            access_path_variant: access_path_variant(access),
106            direction: execution_order_direction(direction),
107            optimization: None,
108            keys_scanned: 0,
109            rows_returned: 0,
110            continuation_applied,
111            index_predicate_applied: false,
112            index_predicate_keys_rejected: 0,
113            distinct_keys_deduped: 0,
114        }
115    }
116
117    fn set_path_outcome(
118        &mut self,
119        optimization: Option<ExecutionOptimization>,
120        keys_scanned: usize,
121        rows_returned: usize,
122        index_predicate_applied: bool,
123        index_predicate_keys_rejected: u64,
124        distinct_keys_deduped: u64,
125    ) {
126        self.optimization = optimization;
127        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
128        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
129        self.index_predicate_applied = index_predicate_applied;
130        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
131        self.distinct_keys_deduped = distinct_keys_deduped;
132    }
133}
134
135fn key_stream_comparator_from_plan<K>(
136    plan: &LogicalPlan<K>,
137    fallback_direction: Direction,
138) -> KeyOrderComparator {
139    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
140        derive_scan_direction(order, SlotSelectionPolicy::Last)
141    });
142
143    // Comparator and child-stream monotonicity must stay aligned until access-path
144    // stream production can emit keys under an order-spec-derived comparator.
145    let comparator_direction = if derived_direction == fallback_direction {
146        derived_direction
147    } else {
148        fallback_direction
149    };
150
151    KeyOrderComparator::from_direction(comparator_direction)
152}
153
154///
155/// FastPathKeyResult
156///
157/// Internal fast-path access result.
158/// Carries ordered keys plus observability metadata for shared execution phases.
159///
160struct FastPathKeyResult {
161    ordered_key_stream: OrderedKeyStreamBox,
162    rows_scanned: usize,
163    optimization: ExecutionOptimization,
164}
165
166///
167/// IndexRangeLimitSpec
168///
169/// Canonical executor decision payload for index-range limit pushdown.
170/// Encodes the bounded fetch size after all eligibility gates pass.
171///
172
173#[derive(Clone, Copy, Debug, Eq, PartialEq)]
174struct IndexRangeLimitSpec {
175    fetch: usize,
176}
177
178///
179/// LoadExecutor
180///
181/// Load-plan executor with canonical post-access semantics.
182/// Coordinates fast paths, trace hooks, and pagination cursors.
183///
184
185#[derive(Clone)]
186pub(crate) struct LoadExecutor<E: EntityKind> {
187    db: Db<E::Canister>,
188    debug: bool,
189    _marker: PhantomData<E>,
190}
191
192impl<E> LoadExecutor<E>
193where
194    E: EntityKind + EntityValue,
195{
196    #[must_use]
197    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
198        Self {
199            db,
200            debug,
201            _marker: PhantomData,
202        }
203    }
204
205    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
206        self.execute_paged_with_cursor(plan, PlannedCursor::none())
207            .map(|page| page.items)
208    }
209
210    pub(in crate::db) fn execute_paged_with_cursor(
211        &self,
212        plan: ExecutablePlan<E>,
213        cursor: impl Into<PlannedCursor>,
214    ) -> Result<CursorPage<E>, InternalError> {
215        self.execute_paged_with_cursor_traced(plan, cursor)
216            .map(|(page, _)| page)
217    }
218
219    #[expect(clippy::too_many_lines)]
220    pub(in crate::db) fn execute_paged_with_cursor_traced(
221        &self,
222        plan: ExecutablePlan<E>,
223        cursor: impl Into<PlannedCursor>,
224    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
225        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
226        let cursor_boundary = cursor.boundary().cloned();
227        let index_range_anchor = cursor.index_range_anchor().cloned();
228
229        if !plan.mode().is_load() {
230            return Err(InternalError::query_executor_invariant(
231                "load executor requires load plans",
232            ));
233        }
234
235        let direction = plan.direction();
236        let continuation_signature = plan.continuation_signature();
237        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
238        let index_range_specs = plan.index_range_specs()?.to_vec();
239        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
240        let mut execution_trace = self
241            .debug
242            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
243        let (plan, predicate_slots) = plan.into_parts();
244
245        let result = (|| {
246            let mut span = Span::<E>::new(ExecKind::Load);
247
248            validate_executor_plan::<E>(&plan)?;
249            let ctx = self.db.recovered_context::<E>()?;
250            let execution_inputs = ExecutionInputs {
251                ctx: &ctx,
252                plan: &plan,
253                stream_bindings: AccessStreamBindings {
254                    index_prefix_specs: index_prefix_specs.as_slice(),
255                    index_range_specs: index_range_specs.as_slice(),
256                    index_range_anchor: index_range_anchor.as_ref(),
257                    direction,
258                },
259                predicate_slots: predicate_slots.as_ref(),
260            };
261
262            record_plan_metrics(&plan.access);
263            // Plan execution routing once, then execute in canonical order.
264            let route_plan = Self::build_execution_route_plan_for_load(
265                &plan,
266                cursor_boundary.as_ref(),
267                index_range_anchor.as_ref(),
268                None,
269                direction,
270            )?;
271
272            // Resolve one canonical key stream, then run shared page materialization/finalization.
273            let mut resolved = Self::resolve_execution_key_stream(
274                &execution_inputs,
275                &route_plan,
276                IndexPredicateCompileMode::ConservativeSubset,
277            )?;
278            let (mut page, keys_scanned, mut post_access_rows) =
279                Self::materialize_key_stream_into_page(
280                    &ctx,
281                    &plan,
282                    predicate_slots.as_ref(),
283                    resolved.key_stream.as_mut(),
284                    route_plan.scan_hints.load_scan_budget_hint,
285                    route_plan.streaming_access_shape_safe(),
286                    cursor_boundary.as_ref(),
287                    direction,
288                    continuation_signature,
289                )?;
290            let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
291            let mut optimization = resolved.optimization;
292            let mut index_predicate_applied = resolved.index_predicate_applied;
293            let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
294            let mut distinct_keys_deduped = resolved
295                .distinct_keys_deduped_counter
296                .as_ref()
297                .map_or(0, |counter| counter.get());
298
299            if Self::index_range_limited_residual_retry_required(
300                &plan,
301                cursor_boundary.as_ref(),
302                &route_plan,
303                rows_scanned,
304                post_access_rows,
305            ) {
306                let mut fallback_route_plan = route_plan;
307                fallback_route_plan.index_range_limit_spec = None;
308                let mut fallback_resolved = Self::resolve_execution_key_stream(
309                    &execution_inputs,
310                    &fallback_route_plan,
311                    IndexPredicateCompileMode::ConservativeSubset,
312                )?;
313                let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
314                    Self::materialize_key_stream_into_page(
315                        &ctx,
316                        &plan,
317                        predicate_slots.as_ref(),
318                        fallback_resolved.key_stream.as_mut(),
319                        fallback_route_plan.scan_hints.load_scan_budget_hint,
320                        fallback_route_plan.streaming_access_shape_safe(),
321                        cursor_boundary.as_ref(),
322                        direction,
323                        continuation_signature,
324                    )?;
325                let fallback_rows_scanned = fallback_resolved
326                    .rows_scanned_override
327                    .unwrap_or(fallback_keys_scanned);
328                let fallback_distinct_keys_deduped = fallback_resolved
329                    .distinct_keys_deduped_counter
330                    .as_ref()
331                    .map_or(0, |counter| counter.get());
332
333                // Retry accounting keeps observability faithful to actual work.
334                rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
335                optimization = fallback_resolved.optimization;
336                index_predicate_applied =
337                    index_predicate_applied || fallback_resolved.index_predicate_applied;
338                index_predicate_keys_rejected = index_predicate_keys_rejected
339                    .saturating_add(fallback_resolved.index_predicate_keys_rejected);
340                distinct_keys_deduped =
341                    distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
342                page = fallback_page;
343                post_access_rows = fallback_post_access_rows;
344            }
345
346            Ok(Self::finalize_execution(
347                page,
348                optimization,
349                rows_scanned,
350                post_access_rows,
351                index_predicate_applied,
352                index_predicate_keys_rejected,
353                distinct_keys_deduped,
354                &mut span,
355                &mut execution_trace,
356            ))
357        })();
358
359        result.map(|page| (page, execution_trace))
360    }
361
362    // Retry index-range limit pushdown when a bounded residual-filter pass may
363    // have under-filled the requested page window.
364    fn index_range_limited_residual_retry_required(
365        plan: &LogicalPlan<E::Key>,
366        cursor_boundary: Option<&CursorBoundary>,
367        route_plan: &ExecutionRoutePlan,
368        rows_scanned: usize,
369        post_access_rows: usize,
370    ) -> bool {
371        let Some(limit_spec) = route_plan.index_range_limit_spec else {
372            return false;
373        };
374        if plan.predicate.is_none() {
375            return false;
376        }
377        if limit_spec.fetch == 0 {
378            return false;
379        }
380        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
381            return false;
382        };
383        let keep_count =
384            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
385                .keep_count;
386        if keep_count == 0 {
387            return false;
388        }
389        if rows_scanned < limit_spec.fetch {
390            return false;
391        }
392
393        post_access_rows < keep_count
394    }
395
396    // Record shared observability outcome for any execution path.
397    fn finalize_path_outcome(
398        execution_trace: &mut Option<ExecutionTrace>,
399        optimization: Option<ExecutionOptimization>,
400        rows_scanned: usize,
401        rows_returned: usize,
402        index_predicate_applied: bool,
403        index_predicate_keys_rejected: u64,
404        distinct_keys_deduped: u64,
405    ) {
406        record_rows_scanned::<E>(rows_scanned);
407        if let Some(execution_trace) = execution_trace.as_mut() {
408            execution_trace.set_path_outcome(
409                optimization,
410                rows_scanned,
411                rows_returned,
412                index_predicate_applied,
413                index_predicate_keys_rejected,
414                distinct_keys_deduped,
415            );
416            debug_assert_eq!(
417                execution_trace.keys_scanned,
418                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
419                "execution trace keys_scanned must match rows_scanned metrics input",
420            );
421        }
422    }
423
424    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
425    fn validate_pk_fast_path_boundary_if_applicable(
426        plan: &LogicalPlan<E::Key>,
427        cursor_boundary: Option<&CursorBoundary>,
428    ) -> Result<(), InternalError> {
429        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
430            return Ok(());
431        }
432        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
433
434        Ok(())
435    }
436}