Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11use self::{
12    execute::{ExecutionInputs, IndexPredicateCompileMode},
13    trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16    db::{
17        Db,
18        executor::{
19            AccessStreamBindings, ExecutablePlan, KeyOrderComparator, OrderedKeyStreamBox,
20            PlannedCursor,
21            aggregate::field::{
22                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
23                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
24            },
25            compile_predicate_slots, compute_page_window, decode_pk_cursor_boundary,
26            plan::{record_plan_metrics, record_rows_scanned},
27            route::{ExecutionRoutePlan, RouteOrderSlotPolicy, derive_scan_direction},
28        },
29        query::policy,
30        query::{
31            contracts::cursor::{ContinuationToken, CursorBoundary},
32            plan::{
33                AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
34                validate::validate_executor_plan,
35            },
36        },
37        response::Response,
38    },
39    error::InternalError,
40    obs::sink::{ExecKind, Span},
41    traits::{EntityKind, EntityValue},
42};
43use std::marker::PhantomData;
44
45///
46/// CursorPage
47///
48/// Internal load page result with continuation cursor payload.
49/// Returned by paged executor entrypoints.
50///
51
52#[derive(Debug)]
53pub(crate) struct CursorPage<E: EntityKind> {
54    pub(crate) items: Response<E>,
55
56    pub(crate) next_cursor: Option<ContinuationToken>,
57}
58
59///
60/// ExecutionAccessPathVariant
61///
62/// Coarse access path shape used by the load execution trace surface.
63///
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum ExecutionAccessPathVariant {
67    ByKey,
68    ByKeys,
69    KeyRange,
70    IndexPrefix,
71    IndexRange,
72    FullScan,
73    Union,
74    Intersection,
75}
76
77///
78/// ExecutionOptimization
79///
80/// Canonical load optimization selected by execution, if any.
81///
82
83#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum ExecutionOptimization {
85    PrimaryKey,
86    SecondaryOrderPushdown,
87    IndexRangeLimitPushdown,
88}
89
90///
91/// ExecutionTrace
92///
93/// Structured, opt-in load execution introspection snapshot.
94/// Captures plan-shape and execution decisions without changing semantics.
95///
96
97#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub struct ExecutionTrace {
99    pub access_path_variant: ExecutionAccessPathVariant,
100    pub direction: OrderDirection,
101    pub optimization: Option<ExecutionOptimization>,
102    pub keys_scanned: u64,
103    pub rows_returned: u64,
104    pub continuation_applied: bool,
105    pub index_predicate_applied: bool,
106    pub index_predicate_keys_rejected: u64,
107    pub distinct_keys_deduped: u64,
108}
109
110impl ExecutionTrace {
111    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
112        Self {
113            access_path_variant: access_path_variant(access),
114            direction: execution_order_direction(direction),
115            optimization: None,
116            keys_scanned: 0,
117            rows_returned: 0,
118            continuation_applied,
119            index_predicate_applied: false,
120            index_predicate_keys_rejected: 0,
121            distinct_keys_deduped: 0,
122        }
123    }
124
125    fn set_path_outcome(
126        &mut self,
127        optimization: Option<ExecutionOptimization>,
128        keys_scanned: usize,
129        rows_returned: usize,
130        index_predicate_applied: bool,
131        index_predicate_keys_rejected: u64,
132        distinct_keys_deduped: u64,
133    ) {
134        self.optimization = optimization;
135        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
136        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
137        self.index_predicate_applied = index_predicate_applied;
138        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
139        self.distinct_keys_deduped = distinct_keys_deduped;
140    }
141}
142
143fn key_stream_comparator_from_plan<K>(
144    plan: &AccessPlannedQuery<K>,
145    fallback_direction: Direction,
146) -> KeyOrderComparator {
147    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
148        derive_scan_direction(order, RouteOrderSlotPolicy::Last)
149    });
150
151    // Comparator and child-stream monotonicity must stay aligned until access-path
152    // stream production can emit keys under an order-spec-derived comparator.
153    let comparator_direction = if derived_direction == fallback_direction {
154        derived_direction
155    } else {
156        fallback_direction
157    };
158
159    KeyOrderComparator::from_direction(comparator_direction)
160}
161
162///
163/// FastPathKeyResult
164///
165/// Internal fast-path access result.
166/// Carries ordered keys plus observability metadata for shared execution phases.
167///
168
169struct FastPathKeyResult {
170    ordered_key_stream: OrderedKeyStreamBox,
171    rows_scanned: usize,
172    optimization: ExecutionOptimization,
173}
174
175///
176/// LoadExecutor
177///
178/// Load-plan executor with canonical post-access semantics.
179/// Coordinates fast paths, trace hooks, and pagination cursors.
180///
181
182#[derive(Clone)]
183pub(crate) struct LoadExecutor<E: EntityKind> {
184    db: Db<E::Canister>,
185    debug: bool,
186    _marker: PhantomData<E>,
187}
188
189impl<E> LoadExecutor<E>
190where
191    E: EntityKind + EntityValue,
192{
193    #[must_use]
194    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
195        Self {
196            db,
197            debug,
198            _marker: PhantomData,
199        }
200    }
201
202    // Resolve one orderable aggregate target field into a stable slot with
203    // canonical field-error taxonomy mapping.
204    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
205        resolve_orderable_aggregate_target_slot::<E>(target_field)
206            .map_err(AggregateFieldValueError::into_internal_error)
207    }
208
209    // Resolve one aggregate target field into a stable slot with canonical
210    // field-error taxonomy mapping.
211    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
212        resolve_any_aggregate_target_slot::<E>(target_field)
213            .map_err(AggregateFieldValueError::into_internal_error)
214    }
215
216    // Resolve one numeric aggregate target field into a stable slot with
217    // canonical field-error taxonomy mapping.
218    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
219        resolve_numeric_aggregate_target_slot::<E>(target_field)
220            .map_err(AggregateFieldValueError::into_internal_error)
221    }
222
223    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
224        self.execute_paged_with_cursor(plan, PlannedCursor::none())
225            .map(|page| page.items)
226    }
227
228    pub(in crate::db) fn execute_paged_with_cursor(
229        &self,
230        plan: ExecutablePlan<E>,
231        cursor: impl Into<PlannedCursor>,
232    ) -> Result<CursorPage<E>, InternalError> {
233        self.execute_paged_with_cursor_traced(plan, cursor)
234            .map(|(page, _)| page)
235    }
236
237    #[expect(clippy::too_many_lines)]
238    pub(in crate::db) fn execute_paged_with_cursor_traced(
239        &self,
240        plan: ExecutablePlan<E>,
241        cursor: impl Into<PlannedCursor>,
242    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
243        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
244        let cursor_boundary = cursor.boundary().cloned();
245        let index_range_anchor = cursor
246            .index_range_anchor()
247            .map(|anchor| anchor.last_raw_key().clone());
248
249        if !plan.mode().is_load() {
250            return Err(InternalError::query_executor_invariant(
251                "load executor requires load plans",
252            ));
253        }
254        debug_assert!(
255            policy::validate_plan_shape(plan.as_inner()).is_ok(),
256            "load executor received a plan shape that bypassed planning validation",
257        );
258
259        let continuation_signature = plan.continuation_signature();
260        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
261        let index_range_specs = plan.index_range_specs()?.to_vec();
262        let route_plan = Self::build_execution_route_plan_for_load(
263            plan.as_inner(),
264            cursor_boundary.as_ref(),
265            index_range_anchor.as_ref(),
266            None,
267        )?;
268        let continuation_applied = !matches!(
269            route_plan.continuation_mode(),
270            crate::db::executor::route::ContinuationMode::Initial
271        );
272        let direction = route_plan.direction();
273        debug_assert_eq!(
274            route_plan.window().effective_offset,
275            plan.as_inner()
276                .effective_page_offset(cursor_boundary.as_ref()),
277            "route window effective offset must match logical plan offset semantics",
278        );
279        let mut execution_trace = self
280            .debug
281            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
282        let plan = plan.into_inner();
283        let predicate_slots = compile_predicate_slots::<E>(&plan);
284
285        let result = (|| {
286            let mut span = Span::<E>::new(ExecKind::Load);
287
288            validate_executor_plan::<E>(&plan)?;
289            let ctx = self.db.recovered_context::<E>()?;
290            let execution_inputs = ExecutionInputs {
291                ctx: &ctx,
292                plan: &plan,
293                stream_bindings: AccessStreamBindings {
294                    index_prefix_specs: index_prefix_specs.as_slice(),
295                    index_range_specs: index_range_specs.as_slice(),
296                    index_range_anchor: index_range_anchor.as_ref(),
297                    direction,
298                },
299                predicate_slots: predicate_slots.as_ref(),
300            };
301
302            record_plan_metrics(&plan.access);
303            // Plan execution routing once, then execute in canonical order.
304            // Resolve one canonical key stream, then run shared page materialization/finalization.
305            let mut resolved = Self::resolve_execution_key_stream(
306                &execution_inputs,
307                &route_plan,
308                IndexPredicateCompileMode::ConservativeSubset,
309            )?;
310            let (mut page, keys_scanned, mut post_access_rows) =
311                Self::materialize_key_stream_into_page(
312                    &ctx,
313                    &plan,
314                    predicate_slots.as_ref(),
315                    resolved.key_stream.as_mut(),
316                    route_plan.scan_hints.load_scan_budget_hint,
317                    route_plan.streaming_access_shape_safe(),
318                    cursor_boundary.as_ref(),
319                    direction,
320                    continuation_signature,
321                )?;
322            let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
323            let mut optimization = resolved.optimization;
324            let mut index_predicate_applied = resolved.index_predicate_applied;
325            let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
326            let mut distinct_keys_deduped = resolved
327                .distinct_keys_deduped_counter
328                .as_ref()
329                .map_or(0, |counter| counter.get());
330
331            if Self::index_range_limited_residual_retry_required(
332                &plan,
333                cursor_boundary.as_ref(),
334                &route_plan,
335                rows_scanned,
336                post_access_rows,
337            ) {
338                let mut fallback_route_plan = route_plan;
339                fallback_route_plan.index_range_limit_spec = None;
340                let mut fallback_resolved = Self::resolve_execution_key_stream(
341                    &execution_inputs,
342                    &fallback_route_plan,
343                    IndexPredicateCompileMode::ConservativeSubset,
344                )?;
345                let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
346                    Self::materialize_key_stream_into_page(
347                        &ctx,
348                        &plan,
349                        predicate_slots.as_ref(),
350                        fallback_resolved.key_stream.as_mut(),
351                        fallback_route_plan.scan_hints.load_scan_budget_hint,
352                        fallback_route_plan.streaming_access_shape_safe(),
353                        cursor_boundary.as_ref(),
354                        direction,
355                        continuation_signature,
356                    )?;
357                let fallback_rows_scanned = fallback_resolved
358                    .rows_scanned_override
359                    .unwrap_or(fallback_keys_scanned);
360                let fallback_distinct_keys_deduped = fallback_resolved
361                    .distinct_keys_deduped_counter
362                    .as_ref()
363                    .map_or(0, |counter| counter.get());
364
365                // Retry accounting keeps observability faithful to actual work.
366                rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
367                optimization = fallback_resolved.optimization;
368                index_predicate_applied =
369                    index_predicate_applied || fallback_resolved.index_predicate_applied;
370                index_predicate_keys_rejected = index_predicate_keys_rejected
371                    .saturating_add(fallback_resolved.index_predicate_keys_rejected);
372                distinct_keys_deduped =
373                    distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
374                page = fallback_page;
375                post_access_rows = fallback_post_access_rows;
376            }
377
378            Ok(Self::finalize_execution(
379                page,
380                optimization,
381                rows_scanned,
382                post_access_rows,
383                index_predicate_applied,
384                index_predicate_keys_rejected,
385                distinct_keys_deduped,
386                &mut span,
387                &mut execution_trace,
388            ))
389        })();
390
391        result.map(|page| (page, execution_trace))
392    }
393
394    // Retry index-range limit pushdown when a bounded residual-filter pass may
395    // have under-filled the requested page window.
396    fn index_range_limited_residual_retry_required(
397        plan: &AccessPlannedQuery<E::Key>,
398        cursor_boundary: Option<&CursorBoundary>,
399        route_plan: &ExecutionRoutePlan,
400        rows_scanned: usize,
401        post_access_rows: usize,
402    ) -> bool {
403        let Some(limit_spec) = route_plan.index_range_limit_spec else {
404            return false;
405        };
406        if plan.predicate.is_none() {
407            return false;
408        }
409        if limit_spec.fetch == 0 {
410            return false;
411        }
412        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
413            return false;
414        };
415        let keep_count =
416            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
417                .keep_count;
418        if keep_count == 0 {
419            return false;
420        }
421        if rows_scanned < limit_spec.fetch {
422            return false;
423        }
424
425        post_access_rows < keep_count
426    }
427
428    // Record shared observability outcome for any execution path.
429    fn finalize_path_outcome(
430        execution_trace: &mut Option<ExecutionTrace>,
431        optimization: Option<ExecutionOptimization>,
432        rows_scanned: usize,
433        rows_returned: usize,
434        index_predicate_applied: bool,
435        index_predicate_keys_rejected: u64,
436        distinct_keys_deduped: u64,
437    ) {
438        record_rows_scanned::<E>(rows_scanned);
439        if let Some(execution_trace) = execution_trace.as_mut() {
440            execution_trace.set_path_outcome(
441                optimization,
442                rows_scanned,
443                rows_returned,
444                index_predicate_applied,
445                index_predicate_keys_rejected,
446                distinct_keys_deduped,
447            );
448            debug_assert_eq!(
449                execution_trace.keys_scanned,
450                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
451                "execution trace keys_scanned must match rows_scanned metrics input",
452            );
453        }
454    }
455
456    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
457    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
458        plan: &AccessPlannedQuery<E::Key>,
459        cursor_boundary: Option<&CursorBoundary>,
460    ) -> Result<(), InternalError> {
461        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
462            return Ok(());
463        }
464        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
465
466        Ok(())
467    }
468}