Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17    db::{
18        Db,
19        access::AccessPlan,
20        cursor::{ContinuationToken, CursorBoundary},
21        direction::Direction,
22        executor::{
23            AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
24            KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
25            aggregate::field::{
26                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28            },
29            compile_predicate_slots, decode_pk_cursor_boundary,
30            plan_metrics::{record_plan_metrics, record_rows_scanned},
31        },
32        plan::{AccessPlannedQuery, OrderDirection, validate::validate_executor_plan},
33        policy,
34        response::Response,
35    },
36    error::InternalError,
37    obs::sink::{ExecKind, Span},
38    traits::{EntityKind, EntityValue, Storable},
39};
40use std::{borrow::Cow, marker::PhantomData};
41
42///
43/// CursorPage
44///
45/// Internal load page result with continuation cursor payload.
46/// Returned by paged executor entrypoints.
47///
48
49#[derive(Debug)]
50pub(crate) struct CursorPage<E: EntityKind> {
51    pub(crate) items: Response<E>,
52
53    pub(crate) next_cursor: Option<ContinuationToken>,
54}
55
56///
57/// ExecutionAccessPathVariant
58///
59/// Coarse access path shape used by the load execution trace surface.
60///
61
62#[derive(Clone, Copy, Debug, Eq, PartialEq)]
63pub enum ExecutionAccessPathVariant {
64    ByKey,
65    ByKeys,
66    KeyRange,
67    IndexPrefix,
68    IndexRange,
69    FullScan,
70    Union,
71    Intersection,
72}
73
74///
75/// ExecutionOptimization
76///
77/// Canonical load optimization selected by execution, if any.
78///
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub enum ExecutionOptimization {
82    PrimaryKey,
83    SecondaryOrderPushdown,
84    IndexRangeLimitPushdown,
85}
86
87///
88/// ExecutionTrace
89///
90/// Structured, opt-in load execution introspection snapshot.
91/// Captures plan-shape and execution decisions without changing semantics.
92///
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq)]
95pub struct ExecutionTrace {
96    pub access_path_variant: ExecutionAccessPathVariant,
97    pub direction: OrderDirection,
98    pub optimization: Option<ExecutionOptimization>,
99    pub keys_scanned: u64,
100    pub rows_returned: u64,
101    pub continuation_applied: bool,
102    pub index_predicate_applied: bool,
103    pub index_predicate_keys_rejected: u64,
104    pub distinct_keys_deduped: u64,
105}
106
107impl ExecutionTrace {
108    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
109        Self {
110            access_path_variant: access_path_variant(access),
111            direction: execution_order_direction(direction),
112            optimization: None,
113            keys_scanned: 0,
114            rows_returned: 0,
115            continuation_applied,
116            index_predicate_applied: false,
117            index_predicate_keys_rejected: 0,
118            distinct_keys_deduped: 0,
119        }
120    }
121
122    fn set_path_outcome(
123        &mut self,
124        optimization: Option<ExecutionOptimization>,
125        keys_scanned: usize,
126        rows_returned: usize,
127        index_predicate_applied: bool,
128        index_predicate_keys_rejected: u64,
129        distinct_keys_deduped: u64,
130    ) {
131        self.optimization = optimization;
132        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
133        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
134        self.index_predicate_applied = index_predicate_applied;
135        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
136        self.distinct_keys_deduped = distinct_keys_deduped;
137    }
138}
139
140pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
141    direction: Direction,
142) -> KeyOrderComparator {
143    KeyOrderComparator::from_direction(direction)
144}
145
146///
147/// FastPathKeyResult
148///
149/// Internal fast-path access result.
150/// Carries ordered keys plus observability metadata for shared execution phases.
151///
152
153pub(in crate::db::executor) struct FastPathKeyResult {
154    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
155    pub(in crate::db::executor) rows_scanned: usize,
156    pub(in crate::db::executor) optimization: ExecutionOptimization,
157}
158
159///
160/// LoadExecutor
161///
162/// Load-plan executor with canonical post-access semantics.
163/// Coordinates fast paths, trace hooks, and pagination cursors.
164///
165
166#[derive(Clone)]
167pub(crate) struct LoadExecutor<E: EntityKind> {
168    db: Db<E::Canister>,
169    debug: bool,
170    _marker: PhantomData<E>,
171}
172
173impl<E> LoadExecutor<E>
174where
175    E: EntityKind + EntityValue,
176{
177    #[must_use]
178    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
179        Self {
180            db,
181            debug,
182            _marker: PhantomData,
183        }
184    }
185
186    // Recover canonical read context for kernel-owned execution setup.
187    pub(in crate::db::executor) fn recovered_context(
188        &self,
189    ) -> Result<crate::db::Context<'_, E>, InternalError> {
190        self.db.recovered_context::<E>()
191    }
192
193    // Resolve one orderable aggregate target field into a stable slot with
194    // canonical field-error taxonomy mapping.
195    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
196        resolve_orderable_aggregate_target_slot::<E>(target_field)
197            .map_err(AggregateFieldValueError::into_internal_error)
198    }
199
200    // Resolve one aggregate target field into a stable slot with canonical
201    // field-error taxonomy mapping.
202    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
203        resolve_any_aggregate_target_slot::<E>(target_field)
204            .map_err(AggregateFieldValueError::into_internal_error)
205    }
206
207    // Resolve one numeric aggregate target field into a stable slot with
208    // canonical field-error taxonomy mapping.
209    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
210        resolve_numeric_aggregate_target_slot::<E>(target_field)
211            .map_err(AggregateFieldValueError::into_internal_error)
212    }
213
214    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
215        self.execute_paged_with_cursor(plan, PlannedCursor::none())
216            .map(|page| page.items)
217    }
218
219    pub(in crate::db) fn execute_paged_with_cursor(
220        &self,
221        plan: ExecutablePlan<E>,
222        cursor: impl Into<PlannedCursor>,
223    ) -> Result<CursorPage<E>, InternalError> {
224        self.execute_paged_with_cursor_traced(plan, cursor)
225            .map(|(page, _)| page)
226    }
227
228    pub(in crate::db) fn execute_paged_with_cursor_traced(
229        &self,
230        plan: ExecutablePlan<E>,
231        cursor: impl Into<PlannedCursor>,
232    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
233        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
234        let cursor_boundary = cursor.boundary().cloned();
235        let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
236            <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
237                anchor.last_raw_key(),
238            ))
239        });
240
241        if !plan.mode().is_load() {
242            return Err(InternalError::query_executor_invariant(
243                "load executor requires load plans",
244            ));
245        }
246        debug_assert!(
247            policy::validate_plan_shape(plan.as_inner()).is_ok(),
248            "load executor received a plan shape that bypassed planning validation",
249        );
250
251        let continuation_signature = plan.continuation_signature();
252        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
253        let index_range_specs = plan.index_range_specs()?.to_vec();
254        let route_plan = Self::build_execution_route_plan_for_load(
255            plan.as_inner(),
256            cursor_boundary.as_ref(),
257            index_range_anchor.as_ref(),
258            None,
259        )?;
260        let continuation_applied = !matches!(
261            route_plan.continuation_mode(),
262            crate::db::executor::route::ContinuationMode::Initial
263        );
264        let direction = route_plan.direction();
265        debug_assert_eq!(
266            route_plan.window().effective_offset,
267            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
268            "route window effective offset must match logical plan offset semantics",
269        );
270        let mut execution_trace = self
271            .debug
272            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
273        let plan = plan.into_inner();
274        let predicate_slots = compile_predicate_slots::<E>(&plan);
275
276        let result = (|| {
277            let mut span = Span::<E>::new(ExecKind::Load);
278
279            validate_executor_plan::<E>(&plan)?;
280            let ctx = self.db.recovered_context::<E>()?;
281            let execution_inputs = ExecutionInputs {
282                ctx: &ctx,
283                plan: &plan,
284                stream_bindings: AccessStreamBindings {
285                    index_prefix_specs: index_prefix_specs.as_slice(),
286                    index_range_specs: index_range_specs.as_slice(),
287                    index_range_anchor: index_range_anchor.as_ref(),
288                    direction,
289                },
290                predicate_slots: predicate_slots.as_ref(),
291            };
292
293            record_plan_metrics(&plan.access);
294            // Plan execution routing once, then execute in canonical order.
295            // Resolve one canonical key stream, then run shared page materialization/finalization.
296            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
297                &execution_inputs,
298                &route_plan,
299                cursor_boundary.as_ref(),
300                continuation_signature,
301                IndexPredicateCompileMode::ConservativeSubset,
302            )?;
303            let page = materialized.page;
304            let rows_scanned = materialized.rows_scanned;
305            let post_access_rows = materialized.post_access_rows;
306            let optimization = materialized.optimization;
307            let index_predicate_applied = materialized.index_predicate_applied;
308            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
309            let distinct_keys_deduped = materialized.distinct_keys_deduped;
310
311            Ok(Self::finalize_execution(
312                page,
313                optimization,
314                rows_scanned,
315                post_access_rows,
316                index_predicate_applied,
317                index_predicate_keys_rejected,
318                distinct_keys_deduped,
319                &mut span,
320                &mut execution_trace,
321            ))
322        })();
323
324        result.map(|page| (page, execution_trace))
325    }
326
327    // Record shared observability outcome for any execution path.
328    fn finalize_path_outcome(
329        execution_trace: &mut Option<ExecutionTrace>,
330        optimization: Option<ExecutionOptimization>,
331        rows_scanned: usize,
332        rows_returned: usize,
333        index_predicate_applied: bool,
334        index_predicate_keys_rejected: u64,
335        distinct_keys_deduped: u64,
336    ) {
337        record_rows_scanned::<E>(rows_scanned);
338        if let Some(execution_trace) = execution_trace.as_mut() {
339            execution_trace.set_path_outcome(
340                optimization,
341                rows_scanned,
342                rows_returned,
343                index_predicate_applied,
344                index_predicate_keys_rejected,
345                distinct_keys_deduped,
346            );
347            debug_assert_eq!(
348                execution_trace.keys_scanned,
349                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
350                "execution trace keys_scanned must match rows_scanned metrics input",
351            );
352        }
353    }
354
355    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
356    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
357        plan: &AccessPlannedQuery<E::Key>,
358        cursor_boundary: Option<&CursorBoundary>,
359    ) -> Result<(), InternalError> {
360        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
361            return Ok(());
362        }
363        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
364
365        Ok(())
366    }
367}