Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_guard;
3mod execute;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod route;
8mod secondary_index;
9mod trace;
10
11use self::{
12    execute::ExecutionInputs,
13    trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16    db::{
17        Db,
18        executor::{
19            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20            plan::{record_plan_metrics, record_rows_scanned},
21        },
22        query::plan::{
23            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
24            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
25            validate::validate_executor_plan,
26        },
27        response::Response,
28    },
29    error::InternalError,
30    obs::sink::{ExecKind, Span},
31    traits::{EntityKind, EntityValue},
32};
33use std::marker::PhantomData;
34
35///
36/// CursorPage
37///
38/// Internal load page result with continuation cursor payload.
39/// Returned by paged executor entrypoints.
40///
41
42#[derive(Debug)]
43pub(crate) struct CursorPage<E: EntityKind> {
44    pub(crate) items: Response<E>,
45
46    pub(crate) next_cursor: Option<Vec<u8>>,
47}
48
49///
50/// ExecutionAccessPathVariant
51///
52/// Coarse access path shape used by the load execution trace surface.
53///
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub enum ExecutionAccessPathVariant {
56    ByKey,
57    ByKeys,
58    KeyRange,
59    IndexPrefix,
60    IndexRange,
61    FullScan,
62    Union,
63    Intersection,
64}
65
66///
67/// ExecutionOptimization
68///
69/// Canonical load optimization selected by execution, if any.
70///
71
72#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub enum ExecutionOptimization {
74    PrimaryKey,
75    SecondaryOrderPushdown,
76    IndexRangeLimitPushdown,
77}
78
79///
80/// ExecutionTrace
81///
82/// Structured, opt-in load execution introspection snapshot.
83/// Captures plan-shape and execution decisions without changing semantics.
84///
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub struct ExecutionTrace {
88    pub access_path_variant: ExecutionAccessPathVariant,
89    pub direction: OrderDirection,
90    pub optimization: Option<ExecutionOptimization>,
91    pub keys_scanned: u64,
92    pub rows_returned: u64,
93    pub continuation_applied: bool,
94}
95
96impl ExecutionTrace {
97    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
98        Self {
99            access_path_variant: access_path_variant(access),
100            direction: execution_order_direction(direction),
101            optimization: None,
102            keys_scanned: 0,
103            rows_returned: 0,
104            continuation_applied,
105        }
106    }
107
108    fn set_path_outcome(
109        &mut self,
110        optimization: Option<ExecutionOptimization>,
111        keys_scanned: usize,
112        rows_returned: usize,
113    ) {
114        self.optimization = optimization;
115        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
116        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
117    }
118}
119
120fn key_stream_comparator_from_plan<K>(
121    plan: &LogicalPlan<K>,
122    fallback_direction: Direction,
123) -> KeyOrderComparator {
124    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
125        derive_scan_direction(order, SlotSelectionPolicy::Last)
126    });
127
128    // Comparator and child-stream monotonicity must stay aligned until access-path
129    // stream production can emit keys under an order-spec-derived comparator.
130    let comparator_direction = if derived_direction == fallback_direction {
131        derived_direction
132    } else {
133        fallback_direction
134    };
135
136    KeyOrderComparator::from_direction(comparator_direction)
137}
138
139///
140/// FastPathKeyResult
141///
142/// Internal fast-path access result.
143/// Carries ordered keys plus observability metadata for shared execution phases.
144///
145struct FastPathKeyResult {
146    ordered_key_stream: OrderedKeyStreamBox,
147    rows_scanned: usize,
148    optimization: ExecutionOptimization,
149}
150
151///
152/// IndexRangeLimitSpec
153///
154/// Canonical executor decision payload for index-range limit pushdown.
155/// Encodes the bounded fetch size after all eligibility gates pass.
156///
157
158#[derive(Clone, Copy, Debug, Eq, PartialEq)]
159struct IndexRangeLimitSpec {
160    fetch: usize,
161}
162
163///
164/// LoadExecutor
165///
166/// Load-plan executor with canonical post-access semantics.
167/// Coordinates fast paths, trace hooks, and pagination cursors.
168///
169
170#[derive(Clone)]
171pub(crate) struct LoadExecutor<E: EntityKind> {
172    db: Db<E::Canister>,
173    debug: bool,
174    _marker: PhantomData<E>,
175}
176
177impl<E> LoadExecutor<E>
178where
179    E: EntityKind + EntityValue,
180{
181    #[must_use]
182    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
183        Self {
184            db,
185            debug,
186            _marker: PhantomData,
187        }
188    }
189
190    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
191        self.execute_paged_with_cursor(plan, PlannedCursor::none())
192            .map(|page| page.items)
193    }
194
195    pub(in crate::db) fn execute_paged_with_cursor(
196        &self,
197        plan: ExecutablePlan<E>,
198        cursor: impl Into<PlannedCursor>,
199    ) -> Result<CursorPage<E>, InternalError> {
200        self.execute_paged_with_cursor_traced(plan, cursor)
201            .map(|(page, _)| page)
202    }
203
204    pub(in crate::db) fn execute_paged_with_cursor_traced(
205        &self,
206        plan: ExecutablePlan<E>,
207        cursor: impl Into<PlannedCursor>,
208    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
209        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
210        let cursor_boundary = cursor.boundary().cloned();
211        let index_range_anchor = cursor.index_range_anchor().cloned();
212
213        if !plan.mode().is_load() {
214            return Err(InternalError::query_executor_invariant(
215                "load executor requires load plans",
216            ));
217        }
218
219        let direction = plan.direction();
220        let continuation_signature = plan.continuation_signature();
221        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
222        let index_range_specs = plan.index_range_specs()?.to_vec();
223        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
224        let mut execution_trace = self
225            .debug
226            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
227
228        let result = (|| {
229            let mut span = Span::<E>::new(ExecKind::Load);
230            let plan = plan.into_inner();
231
232            validate_executor_plan::<E>(&plan)?;
233            let ctx = self.db.recovered_context::<E>()?;
234            let execution_inputs = ExecutionInputs {
235                ctx: &ctx,
236                plan: &plan,
237                stream_bindings: AccessStreamBindings {
238                    index_prefix_specs: index_prefix_specs.as_slice(),
239                    index_range_specs: index_range_specs.as_slice(),
240                    index_range_anchor: index_range_anchor.as_ref(),
241                    direction,
242                },
243            };
244
245            record_plan_metrics(&plan.access);
246            // Plan execution routing once, then execute in canonical order.
247            let route_plan = Self::build_execution_route_plan_for_load(
248                &plan,
249                cursor_boundary.as_ref(),
250                index_range_anchor.as_ref(),
251                None,
252                direction,
253            )?;
254
255            // Resolve one canonical key stream, then run shared page materialization/finalization.
256            let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
257            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
258                &ctx,
259                &plan,
260                resolved.key_stream.as_mut(),
261                route_plan.scan_hints.load_scan_budget_hint,
262                cursor_boundary.as_ref(),
263                direction,
264                continuation_signature,
265            )?;
266            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
267
268            Ok(Self::finalize_execution(
269                page,
270                resolved.optimization,
271                rows_scanned,
272                post_access_rows,
273                &mut span,
274                &mut execution_trace,
275            ))
276        })();
277
278        result.map(|page| (page, execution_trace))
279    }
280
281    // Record shared observability outcome for any execution path.
282    fn finalize_path_outcome(
283        execution_trace: &mut Option<ExecutionTrace>,
284        optimization: Option<ExecutionOptimization>,
285        rows_scanned: usize,
286        rows_returned: usize,
287    ) {
288        record_rows_scanned::<E>(rows_scanned);
289        if let Some(execution_trace) = execution_trace.as_mut() {
290            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
291            debug_assert_eq!(
292                execution_trace.keys_scanned,
293                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
294                "execution trace keys_scanned must match rows_scanned metrics input",
295            );
296        }
297    }
298
299    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
300    fn validate_pk_fast_path_boundary_if_applicable(
301        plan: &LogicalPlan<E::Key>,
302        cursor_boundary: Option<&CursorBoundary>,
303    ) -> Result<(), InternalError> {
304        if !Self::is_pk_order_stream_eligible(plan) {
305            return Ok(());
306        }
307        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
308
309        Ok(())
310    }
311}