Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13    execute::{ExecutionInputs, IndexPredicateCompileMode},
14    route::ExecutionRoutePlan,
15    trace::{access_path_variant, execution_order_direction},
16};
17use crate::{
18    db::{
19        Db,
20        executor::{
21            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
22            plan::{record_plan_metrics, record_rows_scanned},
23        },
24        query::plan::{
25            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
26            PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
27            derive_scan_direction, validate::validate_executor_plan,
28        },
29        response::Response,
30    },
31    error::InternalError,
32    obs::sink::{ExecKind, Span},
33    traits::{EntityKind, EntityValue},
34};
35use std::marker::PhantomData;
36
37///
38/// CursorPage
39///
40/// Internal load page result with continuation cursor payload.
41/// Returned by paged executor entrypoints.
42///
43
44#[derive(Debug)]
45pub(crate) struct CursorPage<E: EntityKind> {
46    pub(crate) items: Response<E>,
47
48    pub(crate) next_cursor: Option<Vec<u8>>,
49}
50
51///
52/// ExecutionAccessPathVariant
53///
54/// Coarse access path shape used by the load execution trace surface.
55///
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum ExecutionAccessPathVariant {
58    ByKey,
59    ByKeys,
60    KeyRange,
61    IndexPrefix,
62    IndexRange,
63    FullScan,
64    Union,
65    Intersection,
66}
67
68///
69/// ExecutionOptimization
70///
71/// Canonical load optimization selected by execution, if any.
72///
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
75pub enum ExecutionOptimization {
76    PrimaryKey,
77    SecondaryOrderPushdown,
78    IndexRangeLimitPushdown,
79}
80
81///
82/// ExecutionTrace
83///
84/// Structured, opt-in load execution introspection snapshot.
85/// Captures plan-shape and execution decisions without changing semantics.
86///
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub struct ExecutionTrace {
90    pub access_path_variant: ExecutionAccessPathVariant,
91    pub direction: OrderDirection,
92    pub optimization: Option<ExecutionOptimization>,
93    pub keys_scanned: u64,
94    pub rows_returned: u64,
95    pub continuation_applied: bool,
96    pub index_predicate_applied: bool,
97    pub index_predicate_keys_rejected: u64,
98    pub distinct_keys_deduped: u64,
99}
100
101impl ExecutionTrace {
102    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
103        Self {
104            access_path_variant: access_path_variant(access),
105            direction: execution_order_direction(direction),
106            optimization: None,
107            keys_scanned: 0,
108            rows_returned: 0,
109            continuation_applied,
110            index_predicate_applied: false,
111            index_predicate_keys_rejected: 0,
112            distinct_keys_deduped: 0,
113        }
114    }
115
116    fn set_path_outcome(
117        &mut self,
118        optimization: Option<ExecutionOptimization>,
119        keys_scanned: usize,
120        rows_returned: usize,
121        index_predicate_applied: bool,
122        index_predicate_keys_rejected: u64,
123        distinct_keys_deduped: u64,
124    ) {
125        self.optimization = optimization;
126        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
127        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
128        self.index_predicate_applied = index_predicate_applied;
129        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
130        self.distinct_keys_deduped = distinct_keys_deduped;
131    }
132}
133
134fn key_stream_comparator_from_plan<K>(
135    plan: &LogicalPlan<K>,
136    fallback_direction: Direction,
137) -> KeyOrderComparator {
138    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
139        derive_scan_direction(order, SlotSelectionPolicy::Last)
140    });
141
142    // Comparator and child-stream monotonicity must stay aligned until access-path
143    // stream production can emit keys under an order-spec-derived comparator.
144    let comparator_direction = if derived_direction == fallback_direction {
145        derived_direction
146    } else {
147        fallback_direction
148    };
149
150    KeyOrderComparator::from_direction(comparator_direction)
151}
152
153///
154/// FastPathKeyResult
155///
156/// Internal fast-path access result.
157/// Carries ordered keys plus observability metadata for shared execution phases.
158///
159struct FastPathKeyResult {
160    ordered_key_stream: OrderedKeyStreamBox,
161    rows_scanned: usize,
162    optimization: ExecutionOptimization,
163}
164
165///
166/// IndexRangeLimitSpec
167///
168/// Canonical executor decision payload for index-range limit pushdown.
169/// Encodes the bounded fetch size after all eligibility gates pass.
170///
171
172#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173struct IndexRangeLimitSpec {
174    fetch: usize,
175}
176
177///
178/// LoadExecutor
179///
180/// Load-plan executor with canonical post-access semantics.
181/// Coordinates fast paths, trace hooks, and pagination cursors.
182///
183
184#[derive(Clone)]
185pub(crate) struct LoadExecutor<E: EntityKind> {
186    db: Db<E::Canister>,
187    debug: bool,
188    _marker: PhantomData<E>,
189}
190
191impl<E> LoadExecutor<E>
192where
193    E: EntityKind + EntityValue,
194{
195    #[must_use]
196    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
197        Self {
198            db,
199            debug,
200            _marker: PhantomData,
201        }
202    }
203
204    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
205        self.execute_paged_with_cursor(plan, PlannedCursor::none())
206            .map(|page| page.items)
207    }
208
209    pub(in crate::db) fn execute_paged_with_cursor(
210        &self,
211        plan: ExecutablePlan<E>,
212        cursor: impl Into<PlannedCursor>,
213    ) -> Result<CursorPage<E>, InternalError> {
214        self.execute_paged_with_cursor_traced(plan, cursor)
215            .map(|(page, _)| page)
216    }
217
218    #[expect(clippy::too_many_lines)]
219    pub(in crate::db) fn execute_paged_with_cursor_traced(
220        &self,
221        plan: ExecutablePlan<E>,
222        cursor: impl Into<PlannedCursor>,
223    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
224        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
225        let cursor_boundary = cursor.boundary().cloned();
226        let index_range_anchor = cursor.index_range_anchor().cloned();
227
228        if !plan.mode().is_load() {
229            return Err(InternalError::query_executor_invariant(
230                "load executor requires load plans",
231            ));
232        }
233
234        let direction = plan.direction();
235        let continuation_signature = plan.continuation_signature();
236        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
237        let index_range_specs = plan.index_range_specs()?.to_vec();
238        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
239        let mut execution_trace = self
240            .debug
241            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
242        let (plan, predicate_slots) = plan.into_parts();
243
244        let result = (|| {
245            let mut span = Span::<E>::new(ExecKind::Load);
246
247            validate_executor_plan::<E>(&plan)?;
248            let ctx = self.db.recovered_context::<E>()?;
249            let execution_inputs = ExecutionInputs {
250                ctx: &ctx,
251                plan: &plan,
252                stream_bindings: AccessStreamBindings {
253                    index_prefix_specs: index_prefix_specs.as_slice(),
254                    index_range_specs: index_range_specs.as_slice(),
255                    index_range_anchor: index_range_anchor.as_ref(),
256                    direction,
257                },
258                predicate_slots: predicate_slots.as_ref(),
259            };
260
261            record_plan_metrics(&plan.access);
262            // Plan execution routing once, then execute in canonical order.
263            let route_plan = Self::build_execution_route_plan_for_load(
264                &plan,
265                cursor_boundary.as_ref(),
266                index_range_anchor.as_ref(),
267                None,
268                direction,
269            )?;
270
271            // Resolve one canonical key stream, then run shared page materialization/finalization.
272            let mut resolved = Self::resolve_execution_key_stream(
273                &execution_inputs,
274                &route_plan,
275                IndexPredicateCompileMode::ConservativeSubset,
276            )?;
277            let (mut page, keys_scanned, mut post_access_rows) =
278                Self::materialize_key_stream_into_page(
279                    &ctx,
280                    &plan,
281                    predicate_slots.as_ref(),
282                    resolved.key_stream.as_mut(),
283                    route_plan.scan_hints.load_scan_budget_hint,
284                    route_plan.streaming_access_shape_safe(),
285                    cursor_boundary.as_ref(),
286                    direction,
287                    continuation_signature,
288                )?;
289            let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
290            let mut optimization = resolved.optimization;
291            let mut index_predicate_applied = resolved.index_predicate_applied;
292            let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
293            let mut distinct_keys_deduped = resolved
294                .distinct_keys_deduped_counter
295                .as_ref()
296                .map_or(0, |counter| counter.get());
297
298            if Self::index_range_limited_residual_retry_required(
299                &plan,
300                cursor_boundary.as_ref(),
301                &route_plan,
302                rows_scanned,
303                post_access_rows,
304            ) {
305                let mut fallback_route_plan = route_plan;
306                fallback_route_plan.index_range_limit_spec = None;
307                let mut fallback_resolved = Self::resolve_execution_key_stream(
308                    &execution_inputs,
309                    &fallback_route_plan,
310                    IndexPredicateCompileMode::ConservativeSubset,
311                )?;
312                let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
313                    Self::materialize_key_stream_into_page(
314                        &ctx,
315                        &plan,
316                        predicate_slots.as_ref(),
317                        fallback_resolved.key_stream.as_mut(),
318                        fallback_route_plan.scan_hints.load_scan_budget_hint,
319                        fallback_route_plan.streaming_access_shape_safe(),
320                        cursor_boundary.as_ref(),
321                        direction,
322                        continuation_signature,
323                    )?;
324                let fallback_rows_scanned = fallback_resolved
325                    .rows_scanned_override
326                    .unwrap_or(fallback_keys_scanned);
327                let fallback_distinct_keys_deduped = fallback_resolved
328                    .distinct_keys_deduped_counter
329                    .as_ref()
330                    .map_or(0, |counter| counter.get());
331
332                // Retry accounting keeps observability faithful to actual work.
333                rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
334                optimization = fallback_resolved.optimization;
335                index_predicate_applied =
336                    index_predicate_applied || fallback_resolved.index_predicate_applied;
337                index_predicate_keys_rejected = index_predicate_keys_rejected
338                    .saturating_add(fallback_resolved.index_predicate_keys_rejected);
339                distinct_keys_deduped =
340                    distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
341                page = fallback_page;
342                post_access_rows = fallback_post_access_rows;
343            }
344
345            Ok(Self::finalize_execution(
346                page,
347                optimization,
348                rows_scanned,
349                post_access_rows,
350                index_predicate_applied,
351                index_predicate_keys_rejected,
352                distinct_keys_deduped,
353                &mut span,
354                &mut execution_trace,
355            ))
356        })();
357
358        result.map(|page| (page, execution_trace))
359    }
360
361    // Retry index-range limit pushdown when a bounded residual-filter pass may
362    // have under-filled the requested page window.
363    fn index_range_limited_residual_retry_required(
364        plan: &LogicalPlan<E::Key>,
365        cursor_boundary: Option<&CursorBoundary>,
366        route_plan: &ExecutionRoutePlan,
367        rows_scanned: usize,
368        post_access_rows: usize,
369    ) -> bool {
370        let Some(limit_spec) = route_plan.index_range_limit_spec else {
371            return false;
372        };
373        if plan.predicate.is_none() {
374            return false;
375        }
376        if limit_spec.fetch == 0 {
377            return false;
378        }
379        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
380            return false;
381        };
382        let keep_count =
383            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
384                .keep_count;
385        if keep_count == 0 {
386            return false;
387        }
388        if rows_scanned < limit_spec.fetch {
389            return false;
390        }
391
392        post_access_rows < keep_count
393    }
394
395    // Record shared observability outcome for any execution path.
396    fn finalize_path_outcome(
397        execution_trace: &mut Option<ExecutionTrace>,
398        optimization: Option<ExecutionOptimization>,
399        rows_scanned: usize,
400        rows_returned: usize,
401        index_predicate_applied: bool,
402        index_predicate_keys_rejected: u64,
403        distinct_keys_deduped: u64,
404    ) {
405        record_rows_scanned::<E>(rows_scanned);
406        if let Some(execution_trace) = execution_trace.as_mut() {
407            execution_trace.set_path_outcome(
408                optimization,
409                rows_scanned,
410                rows_returned,
411                index_predicate_applied,
412                index_predicate_keys_rejected,
413                distinct_keys_deduped,
414            );
415            debug_assert_eq!(
416                execution_trace.keys_scanned,
417                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
418                "execution trace keys_scanned must match rows_scanned metrics input",
419            );
420        }
421    }
422
423    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
424    fn validate_pk_fast_path_boundary_if_applicable(
425        plan: &LogicalPlan<E::Key>,
426        cursor_boundary: Option<&CursorBoundary>,
427    ) -> Result<(), InternalError> {
428        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
429            return Ok(());
430        }
431        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
432
433        Ok(())
434    }
435}