Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17    db::{
18        Db,
19        access::AccessPlan,
20        cursor::{ContinuationToken, CursorBoundary, PlannedCursor, decode_pk_cursor_boundary},
21        direction::Direction,
22        executor::{
23            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
24            IndexPredicateCompileMode, KeyOrderComparator, OrderedKeyStreamBox,
25            aggregate_model::field::{
26                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28            },
29            plan_metrics::{record_plan_metrics, record_rows_scanned},
30            range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
31        },
32        policy,
33        query::plan::{AccessPlannedQuery, OrderDirection},
34        response::Response,
35    },
36    error::InternalError,
37    obs::sink::{ExecKind, Span},
38    traits::{EntityKind, EntityValue},
39};
40use std::marker::PhantomData;
41
42///
43/// CursorPage
44///
45/// Internal load page result with continuation cursor payload.
46/// Returned by paged executor entrypoints.
47///
48
49#[derive(Debug)]
50pub(crate) struct CursorPage<E: EntityKind> {
51    pub(crate) items: Response<E>,
52
53    pub(crate) next_cursor: Option<ContinuationToken>,
54}
55
56///
57/// ExecutionAccessPathVariant
58///
59/// Coarse access path shape used by the load execution trace surface.
60///
61
62#[derive(Clone, Copy, Debug, Eq, PartialEq)]
63pub enum ExecutionAccessPathVariant {
64    ByKey,
65    ByKeys,
66    KeyRange,
67    IndexPrefix,
68    IndexRange,
69    FullScan,
70    Union,
71    Intersection,
72}
73
74///
75/// ExecutionOptimization
76///
77/// Canonical load optimization selected by execution, if any.
78///
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub enum ExecutionOptimization {
82    PrimaryKey,
83    SecondaryOrderPushdown,
84    IndexRangeLimitPushdown,
85}
86
87///
88/// ExecutionTrace
89///
90/// Structured, opt-in load execution introspection snapshot.
91/// Captures plan-shape and execution decisions without changing semantics.
92///
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq)]
95pub struct ExecutionTrace {
96    pub access_path_variant: ExecutionAccessPathVariant,
97    pub direction: OrderDirection,
98    pub optimization: Option<ExecutionOptimization>,
99    pub keys_scanned: u64,
100    pub rows_returned: u64,
101    pub continuation_applied: bool,
102    pub index_predicate_applied: bool,
103    pub index_predicate_keys_rejected: u64,
104    pub distinct_keys_deduped: u64,
105}
106
107impl ExecutionTrace {
108    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
109        Self {
110            access_path_variant: access_path_variant(access),
111            direction: execution_order_direction(direction),
112            optimization: None,
113            keys_scanned: 0,
114            rows_returned: 0,
115            continuation_applied,
116            index_predicate_applied: false,
117            index_predicate_keys_rejected: 0,
118            distinct_keys_deduped: 0,
119        }
120    }
121
122    fn set_path_outcome(
123        &mut self,
124        optimization: Option<ExecutionOptimization>,
125        keys_scanned: usize,
126        rows_returned: usize,
127        index_predicate_applied: bool,
128        index_predicate_keys_rejected: u64,
129        distinct_keys_deduped: u64,
130    ) {
131        self.optimization = optimization;
132        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
133        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
134        self.index_predicate_applied = index_predicate_applied;
135        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
136        self.distinct_keys_deduped = distinct_keys_deduped;
137    }
138}
139
140pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
141    direction: Direction,
142) -> KeyOrderComparator {
143    KeyOrderComparator::from_direction(direction)
144}
145
146///
147/// FastPathKeyResult
148///
149/// Internal fast-path access result.
150/// Carries ordered keys plus observability metadata for shared execution phases.
151///
152
153pub(in crate::db::executor) struct FastPathKeyResult {
154    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
155    pub(in crate::db::executor) rows_scanned: usize,
156    pub(in crate::db::executor) optimization: ExecutionOptimization,
157}
158
159///
160/// LoadExecutor
161///
162/// Load-plan executor with canonical post-access semantics.
163/// Coordinates fast paths, trace hooks, and pagination cursors.
164///
165
166#[derive(Clone)]
167pub(crate) struct LoadExecutor<E: EntityKind> {
168    db: Db<E::Canister>,
169    debug: bool,
170    _marker: PhantomData<E>,
171}
172
173impl<E> LoadExecutor<E>
174where
175    E: EntityKind + EntityValue,
176{
177    #[must_use]
178    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
179        Self {
180            db,
181            debug,
182            _marker: PhantomData,
183        }
184    }
185
186    // Recover canonical read context for kernel-owned execution setup.
187    pub(in crate::db::executor) fn recovered_context(
188        &self,
189    ) -> Result<crate::db::Context<'_, E>, InternalError> {
190        self.db.recovered_context::<E>()
191    }
192
193    // Resolve one orderable aggregate target field into a stable slot with
194    // canonical field-error taxonomy mapping.
195    fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
196        resolve_orderable_aggregate_target_slot::<E>(target_field)
197            .map_err(AggregateFieldValueError::into_internal_error)
198    }
199
200    // Resolve one aggregate target field into a stable slot with canonical
201    // field-error taxonomy mapping.
202    fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
203        resolve_any_aggregate_target_slot::<E>(target_field)
204            .map_err(AggregateFieldValueError::into_internal_error)
205    }
206
207    // Resolve one numeric aggregate target field into a stable slot with
208    // canonical field-error taxonomy mapping.
209    fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
210        resolve_numeric_aggregate_target_slot::<E>(target_field)
211            .map_err(AggregateFieldValueError::into_internal_error)
212    }
213
214    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
215        self.execute_paged_with_cursor(plan, PlannedCursor::none())
216            .map(|page| page.items)
217    }
218
219    pub(in crate::db) fn execute_paged_with_cursor(
220        &self,
221        plan: ExecutablePlan<E>,
222        cursor: impl Into<PlannedCursor>,
223    ) -> Result<CursorPage<E>, InternalError> {
224        self.execute_paged_with_cursor_traced(plan, cursor)
225            .map(|(page, _)| page)
226    }
227
228    pub(in crate::db) fn execute_paged_with_cursor_traced(
229        &self,
230        plan: ExecutablePlan<E>,
231        cursor: impl Into<PlannedCursor>,
232    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
233        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
234        let cursor_boundary = cursor.boundary().cloned();
235        let index_range_token = cursor
236            .index_range_anchor()
237            .map(range_token_from_cursor_anchor);
238
239        if !plan.mode().is_load() {
240            return Err(InternalError::query_executor_invariant(
241                "load executor requires load plans",
242            ));
243        }
244        debug_assert!(
245            policy::validate_plan_shape(plan.as_inner()).is_ok(),
246            "load executor received a plan shape that bypassed planning validation",
247        );
248
249        let continuation_signature = plan.continuation_signature();
250        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
251        let index_range_specs = plan.index_range_specs()?.to_vec();
252        let route_plan = Self::build_execution_route_plan_for_load(
253            plan.as_inner(),
254            cursor_boundary.as_ref(),
255            index_range_token.as_ref(),
256            None,
257        )?;
258        let continuation_applied = !matches!(
259            route_plan.continuation_mode(),
260            crate::db::executor::route::ContinuationMode::Initial
261        );
262        let direction = route_plan.direction();
263        debug_assert_eq!(
264            route_plan.window().effective_offset,
265            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
266            "route window effective offset must match logical plan offset semantics",
267        );
268        let mut execution_trace = self
269            .debug
270            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
271        let plan = plan.into_inner();
272        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
273
274        let result = (|| {
275            let mut span = Span::<E>::new(ExecKind::Load);
276
277            validate_executor_plan::<E>(&plan)?;
278            let ctx = self.db.recovered_context::<E>()?;
279            let execution_inputs = ExecutionInputs {
280                ctx: &ctx,
281                plan: &plan,
282                stream_bindings: AccessStreamBindings {
283                    index_prefix_specs: index_prefix_specs.as_slice(),
284                    index_range_specs: index_range_specs.as_slice(),
285                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
286                    direction,
287                },
288                execution_preparation: &execution_preparation,
289            };
290
291            record_plan_metrics(&plan.access);
292            // Plan execution routing once, then execute in canonical order.
293            // Resolve one canonical key stream, then run shared page materialization/finalization.
294            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
295                &execution_inputs,
296                &route_plan,
297                cursor_boundary.as_ref(),
298                continuation_signature,
299                IndexPredicateCompileMode::ConservativeSubset,
300            )?;
301            let page = materialized.page;
302            let rows_scanned = materialized.rows_scanned;
303            let post_access_rows = materialized.post_access_rows;
304            let optimization = materialized.optimization;
305            let index_predicate_applied = materialized.index_predicate_applied;
306            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
307            let distinct_keys_deduped = materialized.distinct_keys_deduped;
308
309            Ok(Self::finalize_execution(
310                page,
311                optimization,
312                rows_scanned,
313                post_access_rows,
314                index_predicate_applied,
315                index_predicate_keys_rejected,
316                distinct_keys_deduped,
317                &mut span,
318                &mut execution_trace,
319            ))
320        })();
321
322        result.map(|page| (page, execution_trace))
323    }
324
325    // Record shared observability outcome for any execution path.
326    fn finalize_path_outcome(
327        execution_trace: &mut Option<ExecutionTrace>,
328        optimization: Option<ExecutionOptimization>,
329        rows_scanned: usize,
330        rows_returned: usize,
331        index_predicate_applied: bool,
332        index_predicate_keys_rejected: u64,
333        distinct_keys_deduped: u64,
334    ) {
335        record_rows_scanned::<E>(rows_scanned);
336        if let Some(execution_trace) = execution_trace.as_mut() {
337            execution_trace.set_path_outcome(
338                optimization,
339                rows_scanned,
340                rows_returned,
341                index_predicate_applied,
342                index_predicate_keys_rejected,
343                distinct_keys_deduped,
344            );
345            debug_assert_eq!(
346                execution_trace.keys_scanned,
347                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
348                "execution trace keys_scanned must match rows_scanned metrics input",
349            );
350        }
351    }
352
353    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
354    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
355        plan: &AccessPlannedQuery<E::Key>,
356        cursor_boundary: Option<&CursorBoundary>,
357    ) -> Result<(), InternalError> {
358        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
359            return Ok(());
360        }
361        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
362
363        Ok(())
364    }
365}