Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17    db::{
18        Db,
19        access::AccessPlan,
20        cursor::{ContinuationToken, CursorBoundary},
21        direction::Direction,
22        executor::{
23            AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
24            KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
25            aggregate::field::{
26                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28            },
29            compile_predicate_slots, decode_pk_cursor_boundary,
30            plan_metrics::{record_plan_metrics, record_rows_scanned},
31        },
32        plan::{AccessPlannedQuery, OrderDirection},
33        query::plan::validate::validate_executor_plan,
34        query::policy,
35        response::Response,
36    },
37    error::InternalError,
38    obs::sink::{ExecKind, Span},
39    traits::{EntityKind, EntityValue, Storable},
40};
41use std::{borrow::Cow, marker::PhantomData};
42
43///
44/// CursorPage
45///
46/// Internal load page result with continuation cursor payload.
47/// Returned by paged executor entrypoints.
48///
49
50#[derive(Debug)]
51pub(crate) struct CursorPage<E: EntityKind> {
52    pub(crate) items: Response<E>,
53
54    pub(crate) next_cursor: Option<ContinuationToken>,
55}
56
57///
58/// ExecutionAccessPathVariant
59///
60/// Coarse access path shape used by the load execution trace surface.
61///
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
64pub enum ExecutionAccessPathVariant {
65    ByKey,
66    ByKeys,
67    KeyRange,
68    IndexPrefix,
69    IndexRange,
70    FullScan,
71    Union,
72    Intersection,
73}
74
75///
76/// ExecutionOptimization
77///
78/// Canonical load optimization selected by execution, if any.
79///
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq)]
82pub enum ExecutionOptimization {
83    PrimaryKey,
84    SecondaryOrderPushdown,
85    IndexRangeLimitPushdown,
86}
87
88///
89/// ExecutionTrace
90///
91/// Structured, opt-in load execution introspection snapshot.
92/// Captures plan-shape and execution decisions without changing semantics.
93///
94
95#[derive(Clone, Copy, Debug, Eq, PartialEq)]
96pub struct ExecutionTrace {
97    pub access_path_variant: ExecutionAccessPathVariant,
98    pub direction: OrderDirection,
99    pub optimization: Option<ExecutionOptimization>,
100    pub keys_scanned: u64,
101    pub rows_returned: u64,
102    pub continuation_applied: bool,
103    pub index_predicate_applied: bool,
104    pub index_predicate_keys_rejected: u64,
105    pub distinct_keys_deduped: u64,
106}
107
108impl ExecutionTrace {
109    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
110        Self {
111            access_path_variant: access_path_variant(access),
112            direction: execution_order_direction(direction),
113            optimization: None,
114            keys_scanned: 0,
115            rows_returned: 0,
116            continuation_applied,
117            index_predicate_applied: false,
118            index_predicate_keys_rejected: 0,
119            distinct_keys_deduped: 0,
120        }
121    }
122
123    fn set_path_outcome(
124        &mut self,
125        optimization: Option<ExecutionOptimization>,
126        keys_scanned: usize,
127        rows_returned: usize,
128        index_predicate_applied: bool,
129        index_predicate_keys_rejected: u64,
130        distinct_keys_deduped: u64,
131    ) {
132        self.optimization = optimization;
133        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
134        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
135        self.index_predicate_applied = index_predicate_applied;
136        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
137        self.distinct_keys_deduped = distinct_keys_deduped;
138    }
139}
140
141pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
142    direction: Direction,
143) -> KeyOrderComparator {
144    KeyOrderComparator::from_direction(direction)
145}
146
147///
148/// FastPathKeyResult
149///
150/// Internal fast-path access result.
151/// Carries ordered keys plus observability metadata for shared execution phases.
152///
153
154pub(in crate::db::executor) struct FastPathKeyResult {
155    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
156    pub(in crate::db::executor) rows_scanned: usize,
157    pub(in crate::db::executor) optimization: ExecutionOptimization,
158}
159
160///
161/// LoadExecutor
162///
163/// Load-plan executor with canonical post-access semantics.
164/// Coordinates fast paths, trace hooks, and pagination cursors.
165///
166
167#[derive(Clone)]
168pub(crate) struct LoadExecutor<E: EntityKind> {
169    db: Db<E::Canister>,
170    debug: bool,
171    _marker: PhantomData<E>,
172}
173
174impl<E> LoadExecutor<E>
175where
176    E: EntityKind + EntityValue,
177{
178    #[must_use]
179    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
180        Self {
181            db,
182            debug,
183            _marker: PhantomData,
184        }
185    }
186
187    // Recover canonical read context for kernel-owned execution setup.
188    pub(in crate::db::executor) fn recovered_context(
189        &self,
190    ) -> Result<crate::db::Context<'_, E>, InternalError> {
191        self.db.recovered_context::<E>()
192    }
193
194    // Resolve one orderable aggregate target field into a stable slot with
195    // canonical field-error taxonomy mapping.
196    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
197        resolve_orderable_aggregate_target_slot::<E>(target_field)
198            .map_err(AggregateFieldValueError::into_internal_error)
199    }
200
201    // Resolve one aggregate target field into a stable slot with canonical
202    // field-error taxonomy mapping.
203    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
204        resolve_any_aggregate_target_slot::<E>(target_field)
205            .map_err(AggregateFieldValueError::into_internal_error)
206    }
207
208    // Resolve one numeric aggregate target field into a stable slot with
209    // canonical field-error taxonomy mapping.
210    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
211        resolve_numeric_aggregate_target_slot::<E>(target_field)
212            .map_err(AggregateFieldValueError::into_internal_error)
213    }
214
215    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
216        self.execute_paged_with_cursor(plan, PlannedCursor::none())
217            .map(|page| page.items)
218    }
219
220    pub(in crate::db) fn execute_paged_with_cursor(
221        &self,
222        plan: ExecutablePlan<E>,
223        cursor: impl Into<PlannedCursor>,
224    ) -> Result<CursorPage<E>, InternalError> {
225        self.execute_paged_with_cursor_traced(plan, cursor)
226            .map(|(page, _)| page)
227    }
228
229    pub(in crate::db) fn execute_paged_with_cursor_traced(
230        &self,
231        plan: ExecutablePlan<E>,
232        cursor: impl Into<PlannedCursor>,
233    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
234        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
235        let cursor_boundary = cursor.boundary().cloned();
236        let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
237            <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
238                anchor.last_raw_key(),
239            ))
240        });
241
242        if !plan.mode().is_load() {
243            return Err(InternalError::query_executor_invariant(
244                "load executor requires load plans",
245            ));
246        }
247        debug_assert!(
248            policy::validate_plan_shape(plan.as_inner()).is_ok(),
249            "load executor received a plan shape that bypassed planning validation",
250        );
251
252        let continuation_signature = plan.continuation_signature();
253        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
254        let index_range_specs = plan.index_range_specs()?.to_vec();
255        let route_plan = Self::build_execution_route_plan_for_load(
256            plan.as_inner(),
257            cursor_boundary.as_ref(),
258            index_range_anchor.as_ref(),
259            None,
260        )?;
261        let continuation_applied = !matches!(
262            route_plan.continuation_mode(),
263            crate::db::executor::route::ContinuationMode::Initial
264        );
265        let direction = route_plan.direction();
266        debug_assert_eq!(
267            route_plan.window().effective_offset,
268            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
269            "route window effective offset must match logical plan offset semantics",
270        );
271        let mut execution_trace = self
272            .debug
273            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
274        let plan = plan.into_inner();
275        let predicate_slots = compile_predicate_slots::<E>(&plan);
276
277        let result = (|| {
278            let mut span = Span::<E>::new(ExecKind::Load);
279
280            validate_executor_plan::<E>(&plan)?;
281            let ctx = self.db.recovered_context::<E>()?;
282            let execution_inputs = ExecutionInputs {
283                ctx: &ctx,
284                plan: &plan,
285                stream_bindings: AccessStreamBindings {
286                    index_prefix_specs: index_prefix_specs.as_slice(),
287                    index_range_specs: index_range_specs.as_slice(),
288                    index_range_anchor: index_range_anchor.as_ref(),
289                    direction,
290                },
291                predicate_slots: predicate_slots.as_ref(),
292            };
293
294            record_plan_metrics(&plan.access);
295            // Plan execution routing once, then execute in canonical order.
296            // Resolve one canonical key stream, then run shared page materialization/finalization.
297            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
298                &execution_inputs,
299                &route_plan,
300                cursor_boundary.as_ref(),
301                continuation_signature,
302                IndexPredicateCompileMode::ConservativeSubset,
303            )?;
304            let page = materialized.page;
305            let rows_scanned = materialized.rows_scanned;
306            let post_access_rows = materialized.post_access_rows;
307            let optimization = materialized.optimization;
308            let index_predicate_applied = materialized.index_predicate_applied;
309            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
310            let distinct_keys_deduped = materialized.distinct_keys_deduped;
311
312            Ok(Self::finalize_execution(
313                page,
314                optimization,
315                rows_scanned,
316                post_access_rows,
317                index_predicate_applied,
318                index_predicate_keys_rejected,
319                distinct_keys_deduped,
320                &mut span,
321                &mut execution_trace,
322            ))
323        })();
324
325        result.map(|page| (page, execution_trace))
326    }
327
328    // Record shared observability outcome for any execution path.
329    fn finalize_path_outcome(
330        execution_trace: &mut Option<ExecutionTrace>,
331        optimization: Option<ExecutionOptimization>,
332        rows_scanned: usize,
333        rows_returned: usize,
334        index_predicate_applied: bool,
335        index_predicate_keys_rejected: u64,
336        distinct_keys_deduped: u64,
337    ) {
338        record_rows_scanned::<E>(rows_scanned);
339        if let Some(execution_trace) = execution_trace.as_mut() {
340            execution_trace.set_path_outcome(
341                optimization,
342                rows_scanned,
343                rows_returned,
344                index_predicate_applied,
345                index_predicate_keys_rejected,
346                distinct_keys_deduped,
347            );
348            debug_assert_eq!(
349                execution_trace.keys_scanned,
350                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
351                "execution trace keys_scanned must match rows_scanned metrics input",
352            );
353        }
354    }
355
356    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
357    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
358        plan: &AccessPlannedQuery<E::Key>,
359        cursor_boundary: Option<&CursorBoundary>,
360    ) -> Result<(), InternalError> {
361        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
362            return Ok(());
363        }
364        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
365
366        Ok(())
367    }
368}