Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::aggregate::{
12    AggregateExecutionDescriptor, AggregateFastPathInputs,
13};
14pub(in crate::db::executor) use self::execute::{
15    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
16};
17
18use self::trace::{access_path_variant, execution_order_direction};
19use crate::{
20    db::{
21        Db,
22        executor::{
23            AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
24            KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
25            aggregate::field::{
26                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28            },
29            compile_predicate_slots, decode_pk_cursor_boundary,
30            plan::{record_plan_metrics, record_rows_scanned},
31        },
32        query::policy,
33        query::{
34            contracts::cursor::{ContinuationToken, CursorBoundary},
35            plan::{
36                AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
37                validate::validate_executor_plan,
38            },
39        },
40        response::Response,
41    },
42    error::InternalError,
43    obs::sink::{ExecKind, Span},
44    traits::{EntityKind, EntityValue, Storable},
45};
46use std::{borrow::Cow, marker::PhantomData};
47
48///
49/// CursorPage
50///
51/// Internal load page result with continuation cursor payload.
52/// Returned by paged executor entrypoints.
53///
54
55#[derive(Debug)]
56pub(crate) struct CursorPage<E: EntityKind> {
57    pub(crate) items: Response<E>,
58
59    pub(crate) next_cursor: Option<ContinuationToken>,
60}
61
62///
63/// ExecutionAccessPathVariant
64///
65/// Coarse access path shape used by the load execution trace surface.
66///
67
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionAccessPathVariant {
70    ByKey,
71    ByKeys,
72    KeyRange,
73    IndexPrefix,
74    IndexRange,
75    FullScan,
76    Union,
77    Intersection,
78}
79
80///
81/// ExecutionOptimization
82///
83/// Canonical load optimization selected by execution, if any.
84///
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum ExecutionOptimization {
88    PrimaryKey,
89    SecondaryOrderPushdown,
90    IndexRangeLimitPushdown,
91}
92
93///
94/// ExecutionTrace
95///
96/// Structured, opt-in load execution introspection snapshot.
97/// Captures plan-shape and execution decisions without changing semantics.
98///
99
100#[derive(Clone, Copy, Debug, Eq, PartialEq)]
101pub struct ExecutionTrace {
102    pub access_path_variant: ExecutionAccessPathVariant,
103    pub direction: OrderDirection,
104    pub optimization: Option<ExecutionOptimization>,
105    pub keys_scanned: u64,
106    pub rows_returned: u64,
107    pub continuation_applied: bool,
108    pub index_predicate_applied: bool,
109    pub index_predicate_keys_rejected: u64,
110    pub distinct_keys_deduped: u64,
111}
112
113impl ExecutionTrace {
114    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
115        Self {
116            access_path_variant: access_path_variant(access),
117            direction: execution_order_direction(direction),
118            optimization: None,
119            keys_scanned: 0,
120            rows_returned: 0,
121            continuation_applied,
122            index_predicate_applied: false,
123            index_predicate_keys_rejected: 0,
124            distinct_keys_deduped: 0,
125        }
126    }
127
128    fn set_path_outcome(
129        &mut self,
130        optimization: Option<ExecutionOptimization>,
131        keys_scanned: usize,
132        rows_returned: usize,
133        index_predicate_applied: bool,
134        index_predicate_keys_rejected: u64,
135        distinct_keys_deduped: u64,
136    ) {
137        self.optimization = optimization;
138        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
139        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
140        self.index_predicate_applied = index_predicate_applied;
141        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
142        self.distinct_keys_deduped = distinct_keys_deduped;
143    }
144}
145
146pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
147    direction: Direction,
148) -> KeyOrderComparator {
149    KeyOrderComparator::from_direction(direction)
150}
151
152///
153/// FastPathKeyResult
154///
155/// Internal fast-path access result.
156/// Carries ordered keys plus observability metadata for shared execution phases.
157///
158
159pub(in crate::db::executor) struct FastPathKeyResult {
160    ordered_key_stream: OrderedKeyStreamBox,
161    rows_scanned: usize,
162    optimization: ExecutionOptimization,
163}
164
165///
166/// LoadExecutor
167///
168/// Load-plan executor with canonical post-access semantics.
169/// Coordinates fast paths, trace hooks, and pagination cursors.
170///
171
172#[derive(Clone)]
173pub(crate) struct LoadExecutor<E: EntityKind> {
174    db: Db<E::Canister>,
175    debug: bool,
176    _marker: PhantomData<E>,
177}
178
179impl<E> LoadExecutor<E>
180where
181    E: EntityKind + EntityValue,
182{
183    #[must_use]
184    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
185        Self {
186            db,
187            debug,
188            _marker: PhantomData,
189        }
190    }
191
192    // Resolve one orderable aggregate target field into a stable slot with
193    // canonical field-error taxonomy mapping.
194    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
195        resolve_orderable_aggregate_target_slot::<E>(target_field)
196            .map_err(AggregateFieldValueError::into_internal_error)
197    }
198
199    // Resolve one aggregate target field into a stable slot with canonical
200    // field-error taxonomy mapping.
201    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
202        resolve_any_aggregate_target_slot::<E>(target_field)
203            .map_err(AggregateFieldValueError::into_internal_error)
204    }
205
206    // Resolve one numeric aggregate target field into a stable slot with
207    // canonical field-error taxonomy mapping.
208    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
209        resolve_numeric_aggregate_target_slot::<E>(target_field)
210            .map_err(AggregateFieldValueError::into_internal_error)
211    }
212
213    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
214        self.execute_paged_with_cursor(plan, PlannedCursor::none())
215            .map(|page| page.items)
216    }
217
218    pub(in crate::db) fn execute_paged_with_cursor(
219        &self,
220        plan: ExecutablePlan<E>,
221        cursor: impl Into<PlannedCursor>,
222    ) -> Result<CursorPage<E>, InternalError> {
223        self.execute_paged_with_cursor_traced(plan, cursor)
224            .map(|(page, _)| page)
225    }
226
227    pub(in crate::db) fn execute_paged_with_cursor_traced(
228        &self,
229        plan: ExecutablePlan<E>,
230        cursor: impl Into<PlannedCursor>,
231    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
232        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
233        let cursor_boundary = cursor.boundary().cloned();
234        let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
235            <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
236                anchor.last_raw_key(),
237            ))
238        });
239
240        if !plan.mode().is_load() {
241            return Err(InternalError::query_executor_invariant(
242                "load executor requires load plans",
243            ));
244        }
245        debug_assert!(
246            policy::validate_plan_shape(plan.as_inner()).is_ok(),
247            "load executor received a plan shape that bypassed planning validation",
248        );
249
250        let continuation_signature = plan.continuation_signature();
251        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
252        let index_range_specs = plan.index_range_specs()?.to_vec();
253        let route_plan = Self::build_execution_route_plan_for_load(
254            plan.as_inner(),
255            cursor_boundary.as_ref(),
256            index_range_anchor.as_ref(),
257            None,
258        )?;
259        let continuation_applied = !matches!(
260            route_plan.continuation_mode(),
261            crate::db::executor::route::ContinuationMode::Initial
262        );
263        let direction = route_plan.direction();
264        debug_assert_eq!(
265            route_plan.window().effective_offset,
266            plan.as_inner()
267                .effective_page_offset(cursor_boundary.as_ref()),
268            "route window effective offset must match logical plan offset semantics",
269        );
270        let mut execution_trace = self
271            .debug
272            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
273        let plan = plan.into_inner();
274        let predicate_slots = compile_predicate_slots::<E>(&plan);
275
276        let result = (|| {
277            let mut span = Span::<E>::new(ExecKind::Load);
278
279            validate_executor_plan::<E>(&plan)?;
280            let ctx = self.db.recovered_context::<E>()?;
281            let execution_inputs = ExecutionInputs {
282                ctx: &ctx,
283                plan: &plan,
284                stream_bindings: AccessStreamBindings {
285                    index_prefix_specs: index_prefix_specs.as_slice(),
286                    index_range_specs: index_range_specs.as_slice(),
287                    index_range_anchor: index_range_anchor.as_ref(),
288                    direction,
289                },
290                predicate_slots: predicate_slots.as_ref(),
291            };
292
293            record_plan_metrics(&plan.access);
294            // Plan execution routing once, then execute in canonical order.
295            // Resolve one canonical key stream, then run shared page materialization/finalization.
296            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
297                &execution_inputs,
298                &route_plan,
299                cursor_boundary.as_ref(),
300                continuation_signature,
301                IndexPredicateCompileMode::ConservativeSubset,
302            )?;
303            let page = materialized.page;
304            let rows_scanned = materialized.rows_scanned;
305            let post_access_rows = materialized.post_access_rows;
306            let optimization = materialized.optimization;
307            let index_predicate_applied = materialized.index_predicate_applied;
308            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
309            let distinct_keys_deduped = materialized.distinct_keys_deduped;
310
311            Ok(Self::finalize_execution(
312                page,
313                optimization,
314                rows_scanned,
315                post_access_rows,
316                index_predicate_applied,
317                index_predicate_keys_rejected,
318                distinct_keys_deduped,
319                &mut span,
320                &mut execution_trace,
321            ))
322        })();
323
324        result.map(|page| (page, execution_trace))
325    }
326
327    // Record shared observability outcome for any execution path.
328    fn finalize_path_outcome(
329        execution_trace: &mut Option<ExecutionTrace>,
330        optimization: Option<ExecutionOptimization>,
331        rows_scanned: usize,
332        rows_returned: usize,
333        index_predicate_applied: bool,
334        index_predicate_keys_rejected: u64,
335        distinct_keys_deduped: u64,
336    ) {
337        record_rows_scanned::<E>(rows_scanned);
338        if let Some(execution_trace) = execution_trace.as_mut() {
339            execution_trace.set_path_outcome(
340                optimization,
341                rows_scanned,
342                rows_returned,
343                index_predicate_applied,
344                index_predicate_keys_rejected,
345                distinct_keys_deduped,
346            );
347            debug_assert_eq!(
348                execution_trace.keys_scanned,
349                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
350                "execution trace keys_scanned must match rows_scanned metrics input",
351            );
352        }
353    }
354
355    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
356    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
357        plan: &AccessPlannedQuery<E::Key>,
358        cursor_boundary: Option<&CursorBoundary>,
359    ) -> Result<(), InternalError> {
360        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
361            return Ok(());
362        }
363        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
364
365        Ok(())
366    }
367}