Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11use self::{
12    execute::{ExecutionInputs, IndexPredicateCompileMode},
13    trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16    db::{
17        Db,
18        executor::{
19            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20            aggregate::field::{
21                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
22                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
23            },
24            plan::{record_plan_metrics, record_rows_scanned},
25            route::ExecutionRoutePlan,
26        },
27        query::plan::{
28            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
29            PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
30            derive_scan_direction, validate::validate_executor_plan,
31        },
32        response::Response,
33    },
34    error::InternalError,
35    obs::sink::{ExecKind, Span},
36    traits::{EntityKind, EntityValue},
37};
38use std::marker::PhantomData;
39
40///
41/// CursorPage
42///
43/// Internal load page result with continuation cursor payload.
44/// Returned by paged executor entrypoints.
45///
46
47#[derive(Debug)]
48pub(crate) struct CursorPage<E: EntityKind> {
49    pub(crate) items: Response<E>,
50
51    pub(crate) next_cursor: Option<Vec<u8>>,
52}
53
54///
55/// ExecutionAccessPathVariant
56///
57/// Coarse access path shape used by the load execution trace surface.
58///
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub enum ExecutionAccessPathVariant {
62    ByKey,
63    ByKeys,
64    KeyRange,
65    IndexPrefix,
66    IndexRange,
67    FullScan,
68    Union,
69    Intersection,
70}
71
72///
73/// ExecutionOptimization
74///
75/// Canonical load optimization selected by execution, if any.
76///
77
78#[derive(Clone, Copy, Debug, Eq, PartialEq)]
79pub enum ExecutionOptimization {
80    PrimaryKey,
81    SecondaryOrderPushdown,
82    IndexRangeLimitPushdown,
83}
84
85///
86/// ExecutionTrace
87///
88/// Structured, opt-in load execution introspection snapshot.
89/// Captures plan-shape and execution decisions without changing semantics.
90///
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq)]
93pub struct ExecutionTrace {
94    pub access_path_variant: ExecutionAccessPathVariant,
95    pub direction: OrderDirection,
96    pub optimization: Option<ExecutionOptimization>,
97    pub keys_scanned: u64,
98    pub rows_returned: u64,
99    pub continuation_applied: bool,
100    pub index_predicate_applied: bool,
101    pub index_predicate_keys_rejected: u64,
102    pub distinct_keys_deduped: u64,
103}
104
105impl ExecutionTrace {
106    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
107        Self {
108            access_path_variant: access_path_variant(access),
109            direction: execution_order_direction(direction),
110            optimization: None,
111            keys_scanned: 0,
112            rows_returned: 0,
113            continuation_applied,
114            index_predicate_applied: false,
115            index_predicate_keys_rejected: 0,
116            distinct_keys_deduped: 0,
117        }
118    }
119
120    fn set_path_outcome(
121        &mut self,
122        optimization: Option<ExecutionOptimization>,
123        keys_scanned: usize,
124        rows_returned: usize,
125        index_predicate_applied: bool,
126        index_predicate_keys_rejected: u64,
127        distinct_keys_deduped: u64,
128    ) {
129        self.optimization = optimization;
130        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
131        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
132        self.index_predicate_applied = index_predicate_applied;
133        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
134        self.distinct_keys_deduped = distinct_keys_deduped;
135    }
136}
137
138fn key_stream_comparator_from_plan<K>(
139    plan: &LogicalPlan<K>,
140    fallback_direction: Direction,
141) -> KeyOrderComparator {
142    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
143        derive_scan_direction(order, SlotSelectionPolicy::Last)
144    });
145
146    // Comparator and child-stream monotonicity must stay aligned until access-path
147    // stream production can emit keys under an order-spec-derived comparator.
148    let comparator_direction = if derived_direction == fallback_direction {
149        derived_direction
150    } else {
151        fallback_direction
152    };
153
154    KeyOrderComparator::from_direction(comparator_direction)
155}
156
157///
158/// FastPathKeyResult
159///
160/// Internal fast-path access result.
161/// Carries ordered keys plus observability metadata for shared execution phases.
162///
163
164struct FastPathKeyResult {
165    ordered_key_stream: OrderedKeyStreamBox,
166    rows_scanned: usize,
167    optimization: ExecutionOptimization,
168}
169
170///
171/// LoadExecutor
172///
173/// Load-plan executor with canonical post-access semantics.
174/// Coordinates fast paths, trace hooks, and pagination cursors.
175///
176
177#[derive(Clone)]
178pub(crate) struct LoadExecutor<E: EntityKind> {
179    db: Db<E::Canister>,
180    debug: bool,
181    _marker: PhantomData<E>,
182}
183
184impl<E> LoadExecutor<E>
185where
186    E: EntityKind + EntityValue,
187{
188    #[must_use]
189    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
190        Self {
191            db,
192            debug,
193            _marker: PhantomData,
194        }
195    }
196
197    // Resolve one orderable aggregate target field into a stable slot with
198    // canonical field-error taxonomy mapping.
199    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
200        resolve_orderable_aggregate_target_slot::<E>(target_field)
201            .map_err(AggregateFieldValueError::into_internal_error)
202    }
203
204    // Resolve one aggregate target field into a stable slot with canonical
205    // field-error taxonomy mapping.
206    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
207        resolve_any_aggregate_target_slot::<E>(target_field)
208            .map_err(AggregateFieldValueError::into_internal_error)
209    }
210
211    // Resolve one numeric aggregate target field into a stable slot with
212    // canonical field-error taxonomy mapping.
213    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
214        resolve_numeric_aggregate_target_slot::<E>(target_field)
215            .map_err(AggregateFieldValueError::into_internal_error)
216    }
217
218    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
219        self.execute_paged_with_cursor(plan, PlannedCursor::none())
220            .map(|page| page.items)
221    }
222
223    pub(in crate::db) fn execute_paged_with_cursor(
224        &self,
225        plan: ExecutablePlan<E>,
226        cursor: impl Into<PlannedCursor>,
227    ) -> Result<CursorPage<E>, InternalError> {
228        self.execute_paged_with_cursor_traced(plan, cursor)
229            .map(|(page, _)| page)
230    }
231
232    #[expect(clippy::too_many_lines)]
233    pub(in crate::db) fn execute_paged_with_cursor_traced(
234        &self,
235        plan: ExecutablePlan<E>,
236        cursor: impl Into<PlannedCursor>,
237    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
238        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
239        let cursor_boundary = cursor.boundary().cloned();
240        let index_range_anchor = cursor.index_range_anchor().cloned();
241
242        if !plan.mode().is_load() {
243            return Err(InternalError::query_executor_invariant(
244                "load executor requires load plans",
245            ));
246        }
247
248        let direction = plan.direction();
249        let continuation_signature = plan.continuation_signature();
250        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
251        let index_range_specs = plan.index_range_specs()?.to_vec();
252        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
253        let mut execution_trace = self
254            .debug
255            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
256        let (plan, predicate_slots) = plan.into_parts();
257
258        let result = (|| {
259            let mut span = Span::<E>::new(ExecKind::Load);
260
261            validate_executor_plan::<E>(&plan)?;
262            let ctx = self.db.recovered_context::<E>()?;
263            let execution_inputs = ExecutionInputs {
264                ctx: &ctx,
265                plan: &plan,
266                stream_bindings: AccessStreamBindings {
267                    index_prefix_specs: index_prefix_specs.as_slice(),
268                    index_range_specs: index_range_specs.as_slice(),
269                    index_range_anchor: index_range_anchor.as_ref(),
270                    direction,
271                },
272                predicate_slots: predicate_slots.as_ref(),
273            };
274
275            record_plan_metrics(&plan.access);
276            // Plan execution routing once, then execute in canonical order.
277            let route_plan = Self::build_execution_route_plan_for_load(
278                &plan,
279                cursor_boundary.as_ref(),
280                index_range_anchor.as_ref(),
281                None,
282                direction,
283            )?;
284
285            // Resolve one canonical key stream, then run shared page materialization/finalization.
286            let mut resolved = Self::resolve_execution_key_stream(
287                &execution_inputs,
288                &route_plan,
289                IndexPredicateCompileMode::ConservativeSubset,
290            )?;
291            let (mut page, keys_scanned, mut post_access_rows) =
292                Self::materialize_key_stream_into_page(
293                    &ctx,
294                    &plan,
295                    predicate_slots.as_ref(),
296                    resolved.key_stream.as_mut(),
297                    route_plan.scan_hints.load_scan_budget_hint,
298                    route_plan.streaming_access_shape_safe(),
299                    cursor_boundary.as_ref(),
300                    direction,
301                    continuation_signature,
302                )?;
303            let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
304            let mut optimization = resolved.optimization;
305            let mut index_predicate_applied = resolved.index_predicate_applied;
306            let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
307            let mut distinct_keys_deduped = resolved
308                .distinct_keys_deduped_counter
309                .as_ref()
310                .map_or(0, |counter| counter.get());
311
312            if Self::index_range_limited_residual_retry_required(
313                &plan,
314                cursor_boundary.as_ref(),
315                &route_plan,
316                rows_scanned,
317                post_access_rows,
318            ) {
319                let mut fallback_route_plan = route_plan;
320                fallback_route_plan.index_range_limit_spec = None;
321                let mut fallback_resolved = Self::resolve_execution_key_stream(
322                    &execution_inputs,
323                    &fallback_route_plan,
324                    IndexPredicateCompileMode::ConservativeSubset,
325                )?;
326                let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
327                    Self::materialize_key_stream_into_page(
328                        &ctx,
329                        &plan,
330                        predicate_slots.as_ref(),
331                        fallback_resolved.key_stream.as_mut(),
332                        fallback_route_plan.scan_hints.load_scan_budget_hint,
333                        fallback_route_plan.streaming_access_shape_safe(),
334                        cursor_boundary.as_ref(),
335                        direction,
336                        continuation_signature,
337                    )?;
338                let fallback_rows_scanned = fallback_resolved
339                    .rows_scanned_override
340                    .unwrap_or(fallback_keys_scanned);
341                let fallback_distinct_keys_deduped = fallback_resolved
342                    .distinct_keys_deduped_counter
343                    .as_ref()
344                    .map_or(0, |counter| counter.get());
345
346                // Retry accounting keeps observability faithful to actual work.
347                rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
348                optimization = fallback_resolved.optimization;
349                index_predicate_applied =
350                    index_predicate_applied || fallback_resolved.index_predicate_applied;
351                index_predicate_keys_rejected = index_predicate_keys_rejected
352                    .saturating_add(fallback_resolved.index_predicate_keys_rejected);
353                distinct_keys_deduped =
354                    distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
355                page = fallback_page;
356                post_access_rows = fallback_post_access_rows;
357            }
358
359            Ok(Self::finalize_execution(
360                page,
361                optimization,
362                rows_scanned,
363                post_access_rows,
364                index_predicate_applied,
365                index_predicate_keys_rejected,
366                distinct_keys_deduped,
367                &mut span,
368                &mut execution_trace,
369            ))
370        })();
371
372        result.map(|page| (page, execution_trace))
373    }
374
375    // Retry index-range limit pushdown when a bounded residual-filter pass may
376    // have under-filled the requested page window.
377    fn index_range_limited_residual_retry_required(
378        plan: &LogicalPlan<E::Key>,
379        cursor_boundary: Option<&CursorBoundary>,
380        route_plan: &ExecutionRoutePlan,
381        rows_scanned: usize,
382        post_access_rows: usize,
383    ) -> bool {
384        let Some(limit_spec) = route_plan.index_range_limit_spec else {
385            return false;
386        };
387        if plan.predicate.is_none() {
388            return false;
389        }
390        if limit_spec.fetch == 0 {
391            return false;
392        }
393        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
394            return false;
395        };
396        let keep_count =
397            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
398                .keep_count;
399        if keep_count == 0 {
400            return false;
401        }
402        if rows_scanned < limit_spec.fetch {
403            return false;
404        }
405
406        post_access_rows < keep_count
407    }
408
409    // Record shared observability outcome for any execution path.
410    fn finalize_path_outcome(
411        execution_trace: &mut Option<ExecutionTrace>,
412        optimization: Option<ExecutionOptimization>,
413        rows_scanned: usize,
414        rows_returned: usize,
415        index_predicate_applied: bool,
416        index_predicate_keys_rejected: u64,
417        distinct_keys_deduped: u64,
418    ) {
419        record_rows_scanned::<E>(rows_scanned);
420        if let Some(execution_trace) = execution_trace.as_mut() {
421            execution_trace.set_path_outcome(
422                optimization,
423                rows_scanned,
424                rows_returned,
425                index_predicate_applied,
426                index_predicate_keys_rejected,
427                distinct_keys_deduped,
428            );
429            debug_assert_eq!(
430                execution_trace.keys_scanned,
431                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
432                "execution trace keys_scanned must match rows_scanned metrics input",
433            );
434        }
435    }
436
437    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
438    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
439        plan: &LogicalPlan<E::Key>,
440        cursor_boundary: Option<&CursorBoundary>,
441    ) -> Result<(), InternalError> {
442        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
443            return Ok(());
444        }
445        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
446
447        Ok(())
448    }
449}