Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod trace;
9
10use self::{
11    execute::{ExecutionInputs, IndexPredicateCompileMode},
12    trace::{access_path_variant, execution_order_direction},
13};
14use crate::{
15    db::{
16        Db,
17        executor::{
18            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
19            plan::{record_plan_metrics, record_rows_scanned},
20            route::ExecutionRoutePlan,
21        },
22        query::plan::{
23            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
24            PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
25            derive_scan_direction, validate::validate_executor_plan,
26        },
27        response::Response,
28    },
29    error::InternalError,
30    obs::sink::{ExecKind, Span},
31    traits::{EntityKind, EntityValue},
32};
33use std::marker::PhantomData;
34
35///
36/// CursorPage
37///
38/// Internal load page result with continuation cursor payload.
39/// Returned by paged executor entrypoints.
40///
41
42#[derive(Debug)]
43pub(crate) struct CursorPage<E: EntityKind> {
44    pub(crate) items: Response<E>,
45
46    pub(crate) next_cursor: Option<Vec<u8>>,
47}
48
49///
50/// ExecutionAccessPathVariant
51///
52/// Coarse access path shape used by the load execution trace surface.
53///
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57    ByKey,
58    ByKeys,
59    KeyRange,
60    IndexPrefix,
61    IndexRange,
62    FullScan,
63    Union,
64    Intersection,
65}
66
67///
68/// ExecutionOptimization
69///
70/// Canonical load optimization selected by execution, if any.
71///
72
73#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75    PrimaryKey,
76    SecondaryOrderPushdown,
77    IndexRangeLimitPushdown,
78}
79
80///
81/// ExecutionTrace
82///
83/// Structured, opt-in load execution introspection snapshot.
84/// Captures plan-shape and execution decisions without changing semantics.
85///
86
87#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89    pub access_path_variant: ExecutionAccessPathVariant,
90    pub direction: OrderDirection,
91    pub optimization: Option<ExecutionOptimization>,
92    pub keys_scanned: u64,
93    pub rows_returned: u64,
94    pub continuation_applied: bool,
95    pub index_predicate_applied: bool,
96    pub index_predicate_keys_rejected: u64,
97    pub distinct_keys_deduped: u64,
98}
99
100impl ExecutionTrace {
101    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
102        Self {
103            access_path_variant: access_path_variant(access),
104            direction: execution_order_direction(direction),
105            optimization: None,
106            keys_scanned: 0,
107            rows_returned: 0,
108            continuation_applied,
109            index_predicate_applied: false,
110            index_predicate_keys_rejected: 0,
111            distinct_keys_deduped: 0,
112        }
113    }
114
115    fn set_path_outcome(
116        &mut self,
117        optimization: Option<ExecutionOptimization>,
118        keys_scanned: usize,
119        rows_returned: usize,
120        index_predicate_applied: bool,
121        index_predicate_keys_rejected: u64,
122        distinct_keys_deduped: u64,
123    ) {
124        self.optimization = optimization;
125        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
126        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
127        self.index_predicate_applied = index_predicate_applied;
128        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
129        self.distinct_keys_deduped = distinct_keys_deduped;
130    }
131}
132
133fn key_stream_comparator_from_plan<K>(
134    plan: &LogicalPlan<K>,
135    fallback_direction: Direction,
136) -> KeyOrderComparator {
137    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
138        derive_scan_direction(order, SlotSelectionPolicy::Last)
139    });
140
141    // Comparator and child-stream monotonicity must stay aligned until access-path
142    // stream production can emit keys under an order-spec-derived comparator.
143    let comparator_direction = if derived_direction == fallback_direction {
144        derived_direction
145    } else {
146        fallback_direction
147    };
148
149    KeyOrderComparator::from_direction(comparator_direction)
150}
151
152///
153/// FastPathKeyResult
154///
155/// Internal fast-path access result.
156/// Carries ordered keys plus observability metadata for shared execution phases.
157///
158
159struct FastPathKeyResult {
160    ordered_key_stream: OrderedKeyStreamBox,
161    rows_scanned: usize,
162    optimization: ExecutionOptimization,
163}
164
165///
166/// LoadExecutor
167///
168/// Load-plan executor with canonical post-access semantics.
169/// Coordinates fast paths, trace hooks, and pagination cursors.
170///
171
172#[derive(Clone)]
173pub(crate) struct LoadExecutor<E: EntityKind> {
174    db: Db<E::Canister>,
175    debug: bool,
176    _marker: PhantomData<E>,
177}
178
179impl<E> LoadExecutor<E>
180where
181    E: EntityKind + EntityValue,
182{
183    #[must_use]
184    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
185        Self {
186            db,
187            debug,
188            _marker: PhantomData,
189        }
190    }
191
192    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
193        self.execute_paged_with_cursor(plan, PlannedCursor::none())
194            .map(|page| page.items)
195    }
196
197    pub(in crate::db) fn execute_paged_with_cursor(
198        &self,
199        plan: ExecutablePlan<E>,
200        cursor: impl Into<PlannedCursor>,
201    ) -> Result<CursorPage<E>, InternalError> {
202        self.execute_paged_with_cursor_traced(plan, cursor)
203            .map(|(page, _)| page)
204    }
205
206    #[expect(clippy::too_many_lines)]
207    pub(in crate::db) fn execute_paged_with_cursor_traced(
208        &self,
209        plan: ExecutablePlan<E>,
210        cursor: impl Into<PlannedCursor>,
211    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
212        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
213        let cursor_boundary = cursor.boundary().cloned();
214        let index_range_anchor = cursor.index_range_anchor().cloned();
215
216        if !plan.mode().is_load() {
217            return Err(InternalError::query_executor_invariant(
218                "load executor requires load plans",
219            ));
220        }
221
222        let direction = plan.direction();
223        let continuation_signature = plan.continuation_signature();
224        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
225        let index_range_specs = plan.index_range_specs()?.to_vec();
226        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
227        let mut execution_trace = self
228            .debug
229            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
230        let (plan, predicate_slots) = plan.into_parts();
231
232        let result = (|| {
233            let mut span = Span::<E>::new(ExecKind::Load);
234
235            validate_executor_plan::<E>(&plan)?;
236            let ctx = self.db.recovered_context::<E>()?;
237            let execution_inputs = ExecutionInputs {
238                ctx: &ctx,
239                plan: &plan,
240                stream_bindings: AccessStreamBindings {
241                    index_prefix_specs: index_prefix_specs.as_slice(),
242                    index_range_specs: index_range_specs.as_slice(),
243                    index_range_anchor: index_range_anchor.as_ref(),
244                    direction,
245                },
246                predicate_slots: predicate_slots.as_ref(),
247            };
248
249            record_plan_metrics(&plan.access);
250            // Plan execution routing once, then execute in canonical order.
251            let route_plan = Self::build_execution_route_plan_for_load(
252                &plan,
253                cursor_boundary.as_ref(),
254                index_range_anchor.as_ref(),
255                None,
256                direction,
257            )?;
258
259            // Resolve one canonical key stream, then run shared page materialization/finalization.
260            let mut resolved = Self::resolve_execution_key_stream(
261                &execution_inputs,
262                &route_plan,
263                IndexPredicateCompileMode::ConservativeSubset,
264            )?;
265            let (mut page, keys_scanned, mut post_access_rows) =
266                Self::materialize_key_stream_into_page(
267                    &ctx,
268                    &plan,
269                    predicate_slots.as_ref(),
270                    resolved.key_stream.as_mut(),
271                    route_plan.scan_hints.load_scan_budget_hint,
272                    route_plan.streaming_access_shape_safe(),
273                    cursor_boundary.as_ref(),
274                    direction,
275                    continuation_signature,
276                )?;
277            let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
278            let mut optimization = resolved.optimization;
279            let mut index_predicate_applied = resolved.index_predicate_applied;
280            let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
281            let mut distinct_keys_deduped = resolved
282                .distinct_keys_deduped_counter
283                .as_ref()
284                .map_or(0, |counter| counter.get());
285
286            if Self::index_range_limited_residual_retry_required(
287                &plan,
288                cursor_boundary.as_ref(),
289                &route_plan,
290                rows_scanned,
291                post_access_rows,
292            ) {
293                let mut fallback_route_plan = route_plan;
294                fallback_route_plan.index_range_limit_spec = None;
295                let mut fallback_resolved = Self::resolve_execution_key_stream(
296                    &execution_inputs,
297                    &fallback_route_plan,
298                    IndexPredicateCompileMode::ConservativeSubset,
299                )?;
300                let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
301                    Self::materialize_key_stream_into_page(
302                        &ctx,
303                        &plan,
304                        predicate_slots.as_ref(),
305                        fallback_resolved.key_stream.as_mut(),
306                        fallback_route_plan.scan_hints.load_scan_budget_hint,
307                        fallback_route_plan.streaming_access_shape_safe(),
308                        cursor_boundary.as_ref(),
309                        direction,
310                        continuation_signature,
311                    )?;
312                let fallback_rows_scanned = fallback_resolved
313                    .rows_scanned_override
314                    .unwrap_or(fallback_keys_scanned);
315                let fallback_distinct_keys_deduped = fallback_resolved
316                    .distinct_keys_deduped_counter
317                    .as_ref()
318                    .map_or(0, |counter| counter.get());
319
320                // Retry accounting keeps observability faithful to actual work.
321                rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
322                optimization = fallback_resolved.optimization;
323                index_predicate_applied =
324                    index_predicate_applied || fallback_resolved.index_predicate_applied;
325                index_predicate_keys_rejected = index_predicate_keys_rejected
326                    .saturating_add(fallback_resolved.index_predicate_keys_rejected);
327                distinct_keys_deduped =
328                    distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
329                page = fallback_page;
330                post_access_rows = fallback_post_access_rows;
331            }
332
333            Ok(Self::finalize_execution(
334                page,
335                optimization,
336                rows_scanned,
337                post_access_rows,
338                index_predicate_applied,
339                index_predicate_keys_rejected,
340                distinct_keys_deduped,
341                &mut span,
342                &mut execution_trace,
343            ))
344        })();
345
346        result.map(|page| (page, execution_trace))
347    }
348
349    // Retry index-range limit pushdown when a bounded residual-filter pass may
350    // have under-filled the requested page window.
351    fn index_range_limited_residual_retry_required(
352        plan: &LogicalPlan<E::Key>,
353        cursor_boundary: Option<&CursorBoundary>,
354        route_plan: &ExecutionRoutePlan,
355        rows_scanned: usize,
356        post_access_rows: usize,
357    ) -> bool {
358        let Some(limit_spec) = route_plan.index_range_limit_spec else {
359            return false;
360        };
361        if plan.predicate.is_none() {
362            return false;
363        }
364        if limit_spec.fetch == 0 {
365            return false;
366        }
367        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
368            return false;
369        };
370        let keep_count =
371            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
372                .keep_count;
373        if keep_count == 0 {
374            return false;
375        }
376        if rows_scanned < limit_spec.fetch {
377            return false;
378        }
379
380        post_access_rows < keep_count
381    }
382
383    // Record shared observability outcome for any execution path.
384    fn finalize_path_outcome(
385        execution_trace: &mut Option<ExecutionTrace>,
386        optimization: Option<ExecutionOptimization>,
387        rows_scanned: usize,
388        rows_returned: usize,
389        index_predicate_applied: bool,
390        index_predicate_keys_rejected: u64,
391        distinct_keys_deduped: u64,
392    ) {
393        record_rows_scanned::<E>(rows_scanned);
394        if let Some(execution_trace) = execution_trace.as_mut() {
395            execution_trace.set_path_outcome(
396                optimization,
397                rows_scanned,
398                rows_returned,
399                index_predicate_applied,
400                index_predicate_keys_rejected,
401                distinct_keys_deduped,
402            );
403            debug_assert_eq!(
404                execution_trace.keys_scanned,
405                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
406                "execution trace keys_scanned must match rows_scanned metrics input",
407            );
408        }
409    }
410
411    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
412    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
413        plan: &LogicalPlan<E::Key>,
414        cursor_boundary: Option<&CursorBoundary>,
415    ) -> Result<(), InternalError> {
416        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
417            return Ok(());
418        }
419        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
420
421        Ok(())
422    }
423}