Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17    db::{
18        Db,
19        executor::{
20            AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
21            KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
22            aggregate::field::{
23                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
24                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
25            },
26            compile_predicate_slots, decode_pk_cursor_boundary,
27            plan_metrics::{record_plan_metrics, record_rows_scanned},
28        },
29        query::policy,
30        query::{
31            contracts::cursor::{ContinuationToken, CursorBoundary},
32            plan::{
33                AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
34                validate::validate_executor_plan,
35            },
36        },
37        response::Response,
38    },
39    error::InternalError,
40    obs::sink::{ExecKind, Span},
41    traits::{EntityKind, EntityValue, Storable},
42};
43use std::{borrow::Cow, marker::PhantomData};
44
45///
46/// CursorPage
47///
48/// Internal load page result with continuation cursor payload.
49/// Returned by paged executor entrypoints.
50///
51
52#[derive(Debug)]
53pub(crate) struct CursorPage<E: EntityKind> {
54    pub(crate) items: Response<E>,
55
56    pub(crate) next_cursor: Option<ContinuationToken>,
57}
58
59///
60/// ExecutionAccessPathVariant
61///
62/// Coarse access path shape used by the load execution trace surface.
63///
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum ExecutionAccessPathVariant {
67    ByKey,
68    ByKeys,
69    KeyRange,
70    IndexPrefix,
71    IndexRange,
72    FullScan,
73    Union,
74    Intersection,
75}
76
77///
78/// ExecutionOptimization
79///
80/// Canonical load optimization selected by execution, if any.
81///
82
83#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum ExecutionOptimization {
85    PrimaryKey,
86    SecondaryOrderPushdown,
87    IndexRangeLimitPushdown,
88}
89
90///
91/// ExecutionTrace
92///
93/// Structured, opt-in load execution introspection snapshot.
94/// Captures plan-shape and execution decisions without changing semantics.
95///
96
97#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub struct ExecutionTrace {
99    pub access_path_variant: ExecutionAccessPathVariant,
100    pub direction: OrderDirection,
101    pub optimization: Option<ExecutionOptimization>,
102    pub keys_scanned: u64,
103    pub rows_returned: u64,
104    pub continuation_applied: bool,
105    pub index_predicate_applied: bool,
106    pub index_predicate_keys_rejected: u64,
107    pub distinct_keys_deduped: u64,
108}
109
110impl ExecutionTrace {
111    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
112        Self {
113            access_path_variant: access_path_variant(access),
114            direction: execution_order_direction(direction),
115            optimization: None,
116            keys_scanned: 0,
117            rows_returned: 0,
118            continuation_applied,
119            index_predicate_applied: false,
120            index_predicate_keys_rejected: 0,
121            distinct_keys_deduped: 0,
122        }
123    }
124
125    fn set_path_outcome(
126        &mut self,
127        optimization: Option<ExecutionOptimization>,
128        keys_scanned: usize,
129        rows_returned: usize,
130        index_predicate_applied: bool,
131        index_predicate_keys_rejected: u64,
132        distinct_keys_deduped: u64,
133    ) {
134        self.optimization = optimization;
135        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
136        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
137        self.index_predicate_applied = index_predicate_applied;
138        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
139        self.distinct_keys_deduped = distinct_keys_deduped;
140    }
141}
142
143pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
144    direction: Direction,
145) -> KeyOrderComparator {
146    KeyOrderComparator::from_direction(direction)
147}
148
149///
150/// FastPathKeyResult
151///
152/// Internal fast-path access result.
153/// Carries ordered keys plus observability metadata for shared execution phases.
154///
155
156pub(in crate::db::executor) struct FastPathKeyResult {
157    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
158    pub(in crate::db::executor) rows_scanned: usize,
159    pub(in crate::db::executor) optimization: ExecutionOptimization,
160}
161
162///
163/// LoadExecutor
164///
165/// Load-plan executor with canonical post-access semantics.
166/// Coordinates fast paths, trace hooks, and pagination cursors.
167///
168
169#[derive(Clone)]
170pub(crate) struct LoadExecutor<E: EntityKind> {
171    db: Db<E::Canister>,
172    debug: bool,
173    _marker: PhantomData<E>,
174}
175
176impl<E> LoadExecutor<E>
177where
178    E: EntityKind + EntityValue,
179{
180    #[must_use]
181    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
182        Self {
183            db,
184            debug,
185            _marker: PhantomData,
186        }
187    }
188
189    // Recover canonical read context for kernel-owned execution setup.
190    pub(in crate::db::executor) fn recovered_context(
191        &self,
192    ) -> Result<crate::db::Context<'_, E>, InternalError> {
193        self.db.recovered_context::<E>()
194    }
195
196    // Resolve one orderable aggregate target field into a stable slot with
197    // canonical field-error taxonomy mapping.
198    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
199        resolve_orderable_aggregate_target_slot::<E>(target_field)
200            .map_err(AggregateFieldValueError::into_internal_error)
201    }
202
203    // Resolve one aggregate target field into a stable slot with canonical
204    // field-error taxonomy mapping.
205    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
206        resolve_any_aggregate_target_slot::<E>(target_field)
207            .map_err(AggregateFieldValueError::into_internal_error)
208    }
209
210    // Resolve one numeric aggregate target field into a stable slot with
211    // canonical field-error taxonomy mapping.
212    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
213        resolve_numeric_aggregate_target_slot::<E>(target_field)
214            .map_err(AggregateFieldValueError::into_internal_error)
215    }
216
217    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
218        self.execute_paged_with_cursor(plan, PlannedCursor::none())
219            .map(|page| page.items)
220    }
221
222    pub(in crate::db) fn execute_paged_with_cursor(
223        &self,
224        plan: ExecutablePlan<E>,
225        cursor: impl Into<PlannedCursor>,
226    ) -> Result<CursorPage<E>, InternalError> {
227        self.execute_paged_with_cursor_traced(plan, cursor)
228            .map(|(page, _)| page)
229    }
230
231    pub(in crate::db) fn execute_paged_with_cursor_traced(
232        &self,
233        plan: ExecutablePlan<E>,
234        cursor: impl Into<PlannedCursor>,
235    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
236        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
237        let cursor_boundary = cursor.boundary().cloned();
238        let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
239            <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
240                anchor.last_raw_key(),
241            ))
242        });
243
244        if !plan.mode().is_load() {
245            return Err(InternalError::query_executor_invariant(
246                "load executor requires load plans",
247            ));
248        }
249        debug_assert!(
250            policy::validate_plan_shape(plan.as_inner()).is_ok(),
251            "load executor received a plan shape that bypassed planning validation",
252        );
253
254        let continuation_signature = plan.continuation_signature();
255        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
256        let index_range_specs = plan.index_range_specs()?.to_vec();
257        let route_plan = Self::build_execution_route_plan_for_load(
258            plan.as_inner(),
259            cursor_boundary.as_ref(),
260            index_range_anchor.as_ref(),
261            None,
262        )?;
263        let continuation_applied = !matches!(
264            route_plan.continuation_mode(),
265            crate::db::executor::route::ContinuationMode::Initial
266        );
267        let direction = route_plan.direction();
268        debug_assert_eq!(
269            route_plan.window().effective_offset,
270            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
271            "route window effective offset must match logical plan offset semantics",
272        );
273        let mut execution_trace = self
274            .debug
275            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
276        let plan = plan.into_inner();
277        let predicate_slots = compile_predicate_slots::<E>(&plan);
278
279        let result = (|| {
280            let mut span = Span::<E>::new(ExecKind::Load);
281
282            validate_executor_plan::<E>(&plan)?;
283            let ctx = self.db.recovered_context::<E>()?;
284            let execution_inputs = ExecutionInputs {
285                ctx: &ctx,
286                plan: &plan,
287                stream_bindings: AccessStreamBindings {
288                    index_prefix_specs: index_prefix_specs.as_slice(),
289                    index_range_specs: index_range_specs.as_slice(),
290                    index_range_anchor: index_range_anchor.as_ref(),
291                    direction,
292                },
293                predicate_slots: predicate_slots.as_ref(),
294            };
295
296            record_plan_metrics(&plan.access);
297            // Plan execution routing once, then execute in canonical order.
298            // Resolve one canonical key stream, then run shared page materialization/finalization.
299            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
300                &execution_inputs,
301                &route_plan,
302                cursor_boundary.as_ref(),
303                continuation_signature,
304                IndexPredicateCompileMode::ConservativeSubset,
305            )?;
306            let page = materialized.page;
307            let rows_scanned = materialized.rows_scanned;
308            let post_access_rows = materialized.post_access_rows;
309            let optimization = materialized.optimization;
310            let index_predicate_applied = materialized.index_predicate_applied;
311            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
312            let distinct_keys_deduped = materialized.distinct_keys_deduped;
313
314            Ok(Self::finalize_execution(
315                page,
316                optimization,
317                rows_scanned,
318                post_access_rows,
319                index_predicate_applied,
320                index_predicate_keys_rejected,
321                distinct_keys_deduped,
322                &mut span,
323                &mut execution_trace,
324            ))
325        })();
326
327        result.map(|page| (page, execution_trace))
328    }
329
330    // Record shared observability outcome for any execution path.
331    fn finalize_path_outcome(
332        execution_trace: &mut Option<ExecutionTrace>,
333        optimization: Option<ExecutionOptimization>,
334        rows_scanned: usize,
335        rows_returned: usize,
336        index_predicate_applied: bool,
337        index_predicate_keys_rejected: u64,
338        distinct_keys_deduped: u64,
339    ) {
340        record_rows_scanned::<E>(rows_scanned);
341        if let Some(execution_trace) = execution_trace.as_mut() {
342            execution_trace.set_path_outcome(
343                optimization,
344                rows_scanned,
345                rows_returned,
346                index_predicate_applied,
347                index_predicate_keys_rejected,
348                distinct_keys_deduped,
349            );
350            debug_assert_eq!(
351                execution_trace.keys_scanned,
352                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
353                "execution trace keys_scanned must match rows_scanned metrics input",
354            );
355        }
356    }
357
358    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
359    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
360        plan: &AccessPlannedQuery<E::Key>,
361        cursor_boundary: Option<&CursorBoundary>,
362    ) -> Result<(), InternalError> {
363        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
364            return Ok(());
365        }
366        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
367
368        Ok(())
369    }
370}