Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16    db::{
17        Db,
18        access::AccessPlan,
19        cursor::{ContinuationToken, CursorBoundary, PlannedCursor, decode_pk_cursor_boundary},
20        direction::Direction,
21        executor::{
22            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
23            IndexPredicateCompileMode, KeyOrderComparator, OrderedKeyStreamBox,
24            aggregate::field::{
25                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
26                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
27            },
28            plan_metrics::{record_plan_metrics, record_rows_scanned},
29            range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
30        },
31        policy,
32        query::plan::{AccessPlannedQuery, OrderDirection},
33        response::Response,
34    },
35    error::InternalError,
36    obs::sink::{ExecKind, Span},
37    traits::{EntityKind, EntityValue},
38};
39use std::marker::PhantomData;
40
41///
42/// CursorPage
43///
44/// Internal load page result with continuation cursor payload.
45/// Returned by paged executor entrypoints.
46///
47
48#[derive(Debug)]
49pub(crate) struct CursorPage<E: EntityKind> {
50    pub(crate) items: Response<E>,
51
52    pub(crate) next_cursor: Option<ContinuationToken>,
53}
54
55///
56/// ExecutionAccessPathVariant
57///
58/// Coarse access path shape used by the load execution trace surface.
59///
60
61#[derive(Clone, Copy, Debug, Eq, PartialEq)]
62pub enum ExecutionAccessPathVariant {
63    ByKey,
64    ByKeys,
65    KeyRange,
66    IndexPrefix,
67    IndexRange,
68    FullScan,
69    Union,
70    Intersection,
71}
72
73///
74/// ExecutionOptimization
75///
76/// Canonical load optimization selected by execution, if any.
77///
78
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionOptimization {
81    PrimaryKey,
82    SecondaryOrderPushdown,
83    IndexRangeLimitPushdown,
84}
85
86///
87/// ExecutionTrace
88///
89/// Structured, opt-in load execution introspection snapshot.
90/// Captures plan-shape and execution decisions without changing semantics.
91///
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95    pub access_path_variant: ExecutionAccessPathVariant,
96    pub direction: OrderDirection,
97    pub optimization: Option<ExecutionOptimization>,
98    pub keys_scanned: u64,
99    pub rows_returned: u64,
100    pub continuation_applied: bool,
101    pub index_predicate_applied: bool,
102    pub index_predicate_keys_rejected: u64,
103    pub distinct_keys_deduped: u64,
104}
105
106impl ExecutionTrace {
107    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
108        Self {
109            access_path_variant: access_path_variant(access),
110            direction: execution_order_direction(direction),
111            optimization: None,
112            keys_scanned: 0,
113            rows_returned: 0,
114            continuation_applied,
115            index_predicate_applied: false,
116            index_predicate_keys_rejected: 0,
117            distinct_keys_deduped: 0,
118        }
119    }
120
121    fn set_path_outcome(
122        &mut self,
123        optimization: Option<ExecutionOptimization>,
124        keys_scanned: usize,
125        rows_returned: usize,
126        index_predicate_applied: bool,
127        index_predicate_keys_rejected: u64,
128        distinct_keys_deduped: u64,
129    ) {
130        self.optimization = optimization;
131        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
132        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
133        self.index_predicate_applied = index_predicate_applied;
134        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
135        self.distinct_keys_deduped = distinct_keys_deduped;
136    }
137}
138
139pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
140    direction: Direction,
141) -> KeyOrderComparator {
142    KeyOrderComparator::from_direction(direction)
143}
144
145///
146/// FastPathKeyResult
147///
148/// Internal fast-path access result.
149/// Carries ordered keys plus observability metadata for shared execution phases.
150///
151
152pub(in crate::db::executor) struct FastPathKeyResult {
153    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
154    pub(in crate::db::executor) rows_scanned: usize,
155    pub(in crate::db::executor) optimization: ExecutionOptimization,
156}
157
158///
159/// LoadExecutor
160///
161/// Load-plan executor with canonical post-access semantics.
162/// Coordinates fast paths, trace hooks, and pagination cursors.
163///
164
165#[derive(Clone)]
166pub(crate) struct LoadExecutor<E: EntityKind> {
167    db: Db<E::Canister>,
168    debug: bool,
169    _marker: PhantomData<E>,
170}
171
172impl<E> LoadExecutor<E>
173where
174    E: EntityKind + EntityValue,
175{
176    #[must_use]
177    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
178        Self {
179            db,
180            debug,
181            _marker: PhantomData,
182        }
183    }
184
185    // Recover canonical read context for kernel-owned execution setup.
186    pub(in crate::db::executor) fn recovered_context(
187        &self,
188    ) -> Result<crate::db::Context<'_, E>, InternalError> {
189        self.db.recovered_context::<E>()
190    }
191
192    // Resolve one orderable aggregate target field into a stable slot with
193    // canonical field-error taxonomy mapping.
194    pub(in crate::db::executor) fn resolve_orderable_field_slot(
195        target_field: &str,
196    ) -> Result<FieldSlot, InternalError> {
197        resolve_orderable_aggregate_target_slot::<E>(target_field)
198            .map_err(AggregateFieldValueError::into_internal_error)
199    }
200
201    // Resolve one aggregate target field into a stable slot with canonical
202    // field-error taxonomy mapping.
203    pub(in crate::db::executor) fn resolve_any_field_slot(
204        target_field: &str,
205    ) -> Result<FieldSlot, InternalError> {
206        resolve_any_aggregate_target_slot::<E>(target_field)
207            .map_err(AggregateFieldValueError::into_internal_error)
208    }
209
210    // Resolve one numeric aggregate target field into a stable slot with
211    // canonical field-error taxonomy mapping.
212    pub(in crate::db::executor) fn resolve_numeric_field_slot(
213        target_field: &str,
214    ) -> Result<FieldSlot, InternalError> {
215        resolve_numeric_aggregate_target_slot::<E>(target_field)
216            .map_err(AggregateFieldValueError::into_internal_error)
217    }
218
219    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
220        self.execute_paged_with_cursor(plan, PlannedCursor::none())
221            .map(|page| page.items)
222    }
223
224    pub(in crate::db) fn execute_paged_with_cursor(
225        &self,
226        plan: ExecutablePlan<E>,
227        cursor: impl Into<PlannedCursor>,
228    ) -> Result<CursorPage<E>, InternalError> {
229        self.execute_paged_with_cursor_traced(plan, cursor)
230            .map(|(page, _)| page)
231    }
232
233    pub(in crate::db) fn execute_paged_with_cursor_traced(
234        &self,
235        plan: ExecutablePlan<E>,
236        cursor: impl Into<PlannedCursor>,
237    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
238        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
239        let cursor_boundary = cursor.boundary().cloned();
240        let index_range_token = cursor
241            .index_range_anchor()
242            .map(range_token_from_cursor_anchor);
243
244        if !plan.mode().is_load() {
245            return Err(InternalError::query_executor_invariant(
246                "load executor requires load plans",
247            ));
248        }
249        debug_assert!(
250            policy::validate_plan_shape(plan.as_inner()).is_ok(),
251            "load executor received a plan shape that bypassed planning validation",
252        );
253
254        let continuation_signature = plan.continuation_signature();
255        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
256        let index_range_specs = plan.index_range_specs()?.to_vec();
257        let route_plan = Self::build_execution_route_plan_for_load(
258            plan.as_inner(),
259            cursor_boundary.as_ref(),
260            index_range_token.as_ref(),
261            None,
262        )?;
263        let continuation_applied = !matches!(
264            route_plan.continuation_mode(),
265            crate::db::executor::route::ContinuationMode::Initial
266        );
267        let direction = route_plan.direction();
268        debug_assert_eq!(
269            route_plan.window().effective_offset,
270            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
271            "route window effective offset must match logical plan offset semantics",
272        );
273        let mut execution_trace = self
274            .debug
275            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
276        let plan = plan.into_inner();
277        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
278
279        let result = (|| {
280            let mut span = Span::<E>::new(ExecKind::Load);
281
282            validate_executor_plan::<E>(&plan)?;
283            let ctx = self.db.recovered_context::<E>()?;
284            let execution_inputs = ExecutionInputs {
285                ctx: &ctx,
286                plan: &plan,
287                stream_bindings: AccessStreamBindings {
288                    index_prefix_specs: index_prefix_specs.as_slice(),
289                    index_range_specs: index_range_specs.as_slice(),
290                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
291                    direction,
292                },
293                execution_preparation: &execution_preparation,
294            };
295
296            record_plan_metrics(&plan.access);
297            // Plan execution routing once, then execute in canonical order.
298            // Resolve one canonical key stream, then run shared page materialization/finalization.
299            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
300                &execution_inputs,
301                &route_plan,
302                cursor_boundary.as_ref(),
303                continuation_signature,
304                IndexPredicateCompileMode::ConservativeSubset,
305            )?;
306            let page = materialized.page;
307            let rows_scanned = materialized.rows_scanned;
308            let post_access_rows = materialized.post_access_rows;
309            let optimization = materialized.optimization;
310            let index_predicate_applied = materialized.index_predicate_applied;
311            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
312            let distinct_keys_deduped = materialized.distinct_keys_deduped;
313
314            Ok(Self::finalize_execution(
315                page,
316                optimization,
317                rows_scanned,
318                post_access_rows,
319                index_predicate_applied,
320                index_predicate_keys_rejected,
321                distinct_keys_deduped,
322                &mut span,
323                &mut execution_trace,
324            ))
325        })();
326
327        result.map(|page| (page, execution_trace))
328    }
329
330    // Record shared observability outcome for any execution path.
331    fn finalize_path_outcome(
332        execution_trace: &mut Option<ExecutionTrace>,
333        optimization: Option<ExecutionOptimization>,
334        rows_scanned: usize,
335        rows_returned: usize,
336        index_predicate_applied: bool,
337        index_predicate_keys_rejected: u64,
338        distinct_keys_deduped: u64,
339    ) {
340        record_rows_scanned::<E>(rows_scanned);
341        if let Some(execution_trace) = execution_trace.as_mut() {
342            execution_trace.set_path_outcome(
343                optimization,
344                rows_scanned,
345                rows_returned,
346                index_predicate_applied,
347                index_predicate_keys_rejected,
348                distinct_keys_deduped,
349            );
350            debug_assert_eq!(
351                execution_trace.keys_scanned,
352                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
353                "execution trace keys_scanned must match rows_scanned metrics input",
354            );
355        }
356    }
357
358    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
359    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
360        plan: &AccessPlannedQuery<E::Key>,
361        cursor_boundary: Option<&CursorBoundary>,
362    ) -> Result<(), InternalError> {
363        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
364            return Ok(());
365        }
366        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
367
368        Ok(())
369    }
370}