Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::IndexPredicateCompileMode;
12
13use self::{
14    execute::ExecutionInputs,
15    trace::{access_path_variant, execution_order_direction},
16};
17use crate::{
18    db::{
19        Db,
20        executor::{
21            AccessStreamBindings, ExecutablePlan, KeyOrderComparator, OrderedKeyStreamBox,
22            PlannedCursor,
23            aggregate::field::{
24                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
25                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
26            },
27            compile_predicate_slots, compute_page_window, decode_pk_cursor_boundary,
28            plan::{record_plan_metrics, record_rows_scanned},
29            route::{ExecutionRoutePlan, RouteOrderSlotPolicy, derive_scan_direction},
30        },
31        query::policy,
32        query::{
33            contracts::cursor::{ContinuationToken, CursorBoundary},
34            plan::{
35                AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
36                validate::validate_executor_plan,
37            },
38        },
39        response::Response,
40    },
41    error::InternalError,
42    obs::sink::{ExecKind, Span},
43    traits::{EntityKind, EntityValue, Storable},
44};
45use std::{borrow::Cow, marker::PhantomData};
46
47///
48/// CursorPage
49///
50/// Internal load page result with continuation cursor payload.
51/// Returned by paged executor entrypoints.
52///
53
54#[derive(Debug)]
55pub(crate) struct CursorPage<E: EntityKind> {
56    pub(crate) items: Response<E>,
57
58    pub(crate) next_cursor: Option<ContinuationToken>,
59}
60
61///
62/// ExecutionAccessPathVariant
63///
64/// Coarse access path shape used by the load execution trace surface.
65///
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum ExecutionAccessPathVariant {
69    ByKey,
70    ByKeys,
71    KeyRange,
72    IndexPrefix,
73    IndexRange,
74    FullScan,
75    Union,
76    Intersection,
77}
78
79///
80/// ExecutionOptimization
81///
82/// Canonical load optimization selected by execution, if any.
83///
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub enum ExecutionOptimization {
87    PrimaryKey,
88    SecondaryOrderPushdown,
89    IndexRangeLimitPushdown,
90}
91
92///
93/// ExecutionTrace
94///
95/// Structured, opt-in load execution introspection snapshot.
96/// Captures plan-shape and execution decisions without changing semantics.
97///
98
99#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub struct ExecutionTrace {
101    pub access_path_variant: ExecutionAccessPathVariant,
102    pub direction: OrderDirection,
103    pub optimization: Option<ExecutionOptimization>,
104    pub keys_scanned: u64,
105    pub rows_returned: u64,
106    pub continuation_applied: bool,
107    pub index_predicate_applied: bool,
108    pub index_predicate_keys_rejected: u64,
109    pub distinct_keys_deduped: u64,
110}
111
112impl ExecutionTrace {
113    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
114        Self {
115            access_path_variant: access_path_variant(access),
116            direction: execution_order_direction(direction),
117            optimization: None,
118            keys_scanned: 0,
119            rows_returned: 0,
120            continuation_applied,
121            index_predicate_applied: false,
122            index_predicate_keys_rejected: 0,
123            distinct_keys_deduped: 0,
124        }
125    }
126
127    fn set_path_outcome(
128        &mut self,
129        optimization: Option<ExecutionOptimization>,
130        keys_scanned: usize,
131        rows_returned: usize,
132        index_predicate_applied: bool,
133        index_predicate_keys_rejected: u64,
134        distinct_keys_deduped: u64,
135    ) {
136        self.optimization = optimization;
137        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
138        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
139        self.index_predicate_applied = index_predicate_applied;
140        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
141        self.distinct_keys_deduped = distinct_keys_deduped;
142    }
143}
144
145fn key_stream_comparator_from_plan<K>(
146    plan: &AccessPlannedQuery<K>,
147    fallback_direction: Direction,
148) -> KeyOrderComparator {
149    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
150        derive_scan_direction(order, RouteOrderSlotPolicy::Last)
151    });
152
153    // Comparator and child-stream monotonicity must stay aligned until access-path
154    // stream production can emit keys under an order-spec-derived comparator.
155    let comparator_direction = if derived_direction == fallback_direction {
156        derived_direction
157    } else {
158        fallback_direction
159    };
160
161    KeyOrderComparator::from_direction(comparator_direction)
162}
163
164///
165/// FastPathKeyResult
166///
167/// Internal fast-path access result.
168/// Carries ordered keys plus observability metadata for shared execution phases.
169///
170
171struct FastPathKeyResult {
172    ordered_key_stream: OrderedKeyStreamBox,
173    rows_scanned: usize,
174    optimization: ExecutionOptimization,
175}
176
177///
178/// LoadExecutor
179///
180/// Load-plan executor with canonical post-access semantics.
181/// Coordinates fast paths, trace hooks, and pagination cursors.
182///
183
184#[derive(Clone)]
185pub(crate) struct LoadExecutor<E: EntityKind> {
186    db: Db<E::Canister>,
187    debug: bool,
188    _marker: PhantomData<E>,
189}
190
191impl<E> LoadExecutor<E>
192where
193    E: EntityKind + EntityValue,
194{
195    #[must_use]
196    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
197        Self {
198            db,
199            debug,
200            _marker: PhantomData,
201        }
202    }
203
204    // Resolve one orderable aggregate target field into a stable slot with
205    // canonical field-error taxonomy mapping.
206    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
207        resolve_orderable_aggregate_target_slot::<E>(target_field)
208            .map_err(AggregateFieldValueError::into_internal_error)
209    }
210
211    // Resolve one aggregate target field into a stable slot with canonical
212    // field-error taxonomy mapping.
213    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
214        resolve_any_aggregate_target_slot::<E>(target_field)
215            .map_err(AggregateFieldValueError::into_internal_error)
216    }
217
218    // Resolve one numeric aggregate target field into a stable slot with
219    // canonical field-error taxonomy mapping.
220    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
221        resolve_numeric_aggregate_target_slot::<E>(target_field)
222            .map_err(AggregateFieldValueError::into_internal_error)
223    }
224
225    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
226        self.execute_paged_with_cursor(plan, PlannedCursor::none())
227            .map(|page| page.items)
228    }
229
230    pub(in crate::db) fn execute_paged_with_cursor(
231        &self,
232        plan: ExecutablePlan<E>,
233        cursor: impl Into<PlannedCursor>,
234    ) -> Result<CursorPage<E>, InternalError> {
235        self.execute_paged_with_cursor_traced(plan, cursor)
236            .map(|(page, _)| page)
237    }
238
239    pub(in crate::db) fn execute_paged_with_cursor_traced(
240        &self,
241        plan: ExecutablePlan<E>,
242        cursor: impl Into<PlannedCursor>,
243    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
244        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
245        let cursor_boundary = cursor.boundary().cloned();
246        let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
247            <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
248                anchor.last_raw_key(),
249            ))
250        });
251
252        if !plan.mode().is_load() {
253            return Err(InternalError::query_executor_invariant(
254                "load executor requires load plans",
255            ));
256        }
257        debug_assert!(
258            policy::validate_plan_shape(plan.as_inner()).is_ok(),
259            "load executor received a plan shape that bypassed planning validation",
260        );
261
262        let continuation_signature = plan.continuation_signature();
263        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
264        let index_range_specs = plan.index_range_specs()?.to_vec();
265        let route_plan = Self::build_execution_route_plan_for_load(
266            plan.as_inner(),
267            cursor_boundary.as_ref(),
268            index_range_anchor.as_ref(),
269            None,
270        )?;
271        let continuation_applied = !matches!(
272            route_plan.continuation_mode(),
273            crate::db::executor::route::ContinuationMode::Initial
274        );
275        let direction = route_plan.direction();
276        debug_assert_eq!(
277            route_plan.window().effective_offset,
278            plan.as_inner()
279                .effective_page_offset(cursor_boundary.as_ref()),
280            "route window effective offset must match logical plan offset semantics",
281        );
282        let mut execution_trace = self
283            .debug
284            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
285        let plan = plan.into_inner();
286        let predicate_slots = compile_predicate_slots::<E>(&plan);
287
288        let result = (|| {
289            let mut span = Span::<E>::new(ExecKind::Load);
290
291            validate_executor_plan::<E>(&plan)?;
292            let ctx = self.db.recovered_context::<E>()?;
293            let execution_inputs = ExecutionInputs {
294                ctx: &ctx,
295                plan: &plan,
296                stream_bindings: AccessStreamBindings {
297                    index_prefix_specs: index_prefix_specs.as_slice(),
298                    index_range_specs: index_range_specs.as_slice(),
299                    index_range_anchor: index_range_anchor.as_ref(),
300                    direction,
301                },
302                predicate_slots: predicate_slots.as_ref(),
303            };
304
305            record_plan_metrics(&plan.access);
306            // Plan execution routing once, then execute in canonical order.
307            // Resolve one canonical key stream, then run shared page materialization/finalization.
308            let materialized = Self::materialize_with_optional_residual_retry(
309                &execution_inputs,
310                &route_plan,
311                cursor_boundary.as_ref(),
312                continuation_signature,
313                IndexPredicateCompileMode::ConservativeSubset,
314            )?;
315            let page = materialized.page;
316            let rows_scanned = materialized.rows_scanned;
317            let post_access_rows = materialized.post_access_rows;
318            let optimization = materialized.optimization;
319            let index_predicate_applied = materialized.index_predicate_applied;
320            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
321            let distinct_keys_deduped = materialized.distinct_keys_deduped;
322
323            Ok(Self::finalize_execution(
324                page,
325                optimization,
326                rows_scanned,
327                post_access_rows,
328                index_predicate_applied,
329                index_predicate_keys_rejected,
330                distinct_keys_deduped,
331                &mut span,
332                &mut execution_trace,
333            ))
334        })();
335
336        result.map(|page| (page, execution_trace))
337    }
338
339    // Retry index-range limit pushdown when a bounded residual-filter pass may
340    // have under-filled the requested page window.
341    fn index_range_limited_residual_retry_required(
342        plan: &AccessPlannedQuery<E::Key>,
343        cursor_boundary: Option<&CursorBoundary>,
344        route_plan: &ExecutionRoutePlan,
345        rows_scanned: usize,
346        post_access_rows: usize,
347    ) -> bool {
348        let Some(limit_spec) = route_plan.index_range_limit_spec else {
349            return false;
350        };
351        if plan.predicate.is_none() {
352            return false;
353        }
354        if limit_spec.fetch == 0 {
355            return false;
356        }
357        let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
358            return false;
359        };
360        let keep_count =
361            compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
362                .keep_count;
363        if keep_count == 0 {
364            return false;
365        }
366        if rows_scanned < limit_spec.fetch {
367            return false;
368        }
369
370        post_access_rows < keep_count
371    }
372
373    // Record shared observability outcome for any execution path.
374    fn finalize_path_outcome(
375        execution_trace: &mut Option<ExecutionTrace>,
376        optimization: Option<ExecutionOptimization>,
377        rows_scanned: usize,
378        rows_returned: usize,
379        index_predicate_applied: bool,
380        index_predicate_keys_rejected: u64,
381        distinct_keys_deduped: u64,
382    ) {
383        record_rows_scanned::<E>(rows_scanned);
384        if let Some(execution_trace) = execution_trace.as_mut() {
385            execution_trace.set_path_outcome(
386                optimization,
387                rows_scanned,
388                rows_returned,
389                index_predicate_applied,
390                index_predicate_keys_rejected,
391                distinct_keys_deduped,
392            );
393            debug_assert_eq!(
394                execution_trace.keys_scanned,
395                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
396                "execution trace keys_scanned must match rows_scanned metrics input",
397            );
398        }
399    }
400
401    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
402    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
403        plan: &AccessPlannedQuery<E::Key>,
404        cursor_boundary: Option<&CursorBoundary>,
405    ) -> Result<(), InternalError> {
406        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
407            return Ok(());
408        }
409        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
410
411        Ok(())
412    }
413}