Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod route;
7mod secondary_index;
8mod trace;
9
10use self::{
11    execute::ExecutionInputs,
12    trace::{access_path_variant, execution_order_direction},
13};
14use crate::{
15    db::{
16        Db,
17        executor::{
18            KeyOrderComparator, OrderedKeyStreamBox,
19            plan::{record_plan_metrics, record_rows_scanned},
20        },
21        query::plan::{
22            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
23            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
24            validate::validate_executor_plan,
25        },
26        response::Response,
27    },
28    error::InternalError,
29    obs::sink::{ExecKind, Span},
30    traits::{EntityKind, EntityValue},
31};
32use std::marker::PhantomData;
33
34///
35/// CursorPage
36///
37/// Internal load page result with continuation cursor payload.
38/// Returned by paged executor entrypoints.
39///
40
41#[derive(Debug)]
42pub(crate) struct CursorPage<E: EntityKind> {
43    pub(crate) items: Response<E>,
44
45    pub(crate) next_cursor: Option<Vec<u8>>,
46}
47
48///
49/// ExecutionAccessPathVariant
50///
51/// Coarse access path shape used by the load execution trace surface.
52///
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum ExecutionAccessPathVariant {
55    ByKey,
56    ByKeys,
57    KeyRange,
58    IndexPrefix,
59    IndexRange,
60    FullScan,
61    Union,
62    Intersection,
63}
64
65///
66/// ExecutionOptimization
67///
68/// Canonical load optimization selected by execution, if any.
69///
70
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72pub enum ExecutionOptimization {
73    PrimaryKey,
74    SecondaryOrderPushdown,
75    IndexRangeLimitPushdown,
76}
77
78///
79/// ExecutionTrace
80///
81/// Structured, opt-in load execution introspection snapshot.
82/// Captures plan-shape and execution decisions without changing semantics.
83///
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct ExecutionTrace {
87    pub access_path_variant: ExecutionAccessPathVariant,
88    pub direction: OrderDirection,
89    pub optimization: Option<ExecutionOptimization>,
90    pub keys_scanned: u64,
91    pub rows_returned: u64,
92    pub continuation_applied: bool,
93}
94
95impl ExecutionTrace {
96    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
97        Self {
98            access_path_variant: access_path_variant(access),
99            direction: execution_order_direction(direction),
100            optimization: None,
101            keys_scanned: 0,
102            rows_returned: 0,
103            continuation_applied,
104        }
105    }
106
107    fn set_path_outcome(
108        &mut self,
109        optimization: Option<ExecutionOptimization>,
110        keys_scanned: usize,
111        rows_returned: usize,
112    ) {
113        self.optimization = optimization;
114        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
115        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
116    }
117}
118
119fn key_stream_comparator_from_plan<K>(
120    plan: &LogicalPlan<K>,
121    fallback_direction: Direction,
122) -> KeyOrderComparator {
123    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
124        derive_scan_direction(order, SlotSelectionPolicy::Last)
125    });
126
127    // Comparator and child-stream monotonicity must stay aligned until access-path
128    // stream production can emit keys under an order-spec-derived comparator.
129    let comparator_direction = if derived_direction == fallback_direction {
130        derived_direction
131    } else {
132        fallback_direction
133    };
134
135    KeyOrderComparator::from_direction(comparator_direction)
136}
137
138///
139/// FastPathKeyResult
140///
141/// Internal fast-path access result.
142/// Carries ordered keys plus observability metadata for shared execution phases.
143///
144struct FastPathKeyResult {
145    ordered_key_stream: OrderedKeyStreamBox,
146    rows_scanned: usize,
147    optimization: ExecutionOptimization,
148}
149
150///
151/// IndexRangeLimitSpec
152///
153/// Canonical executor decision payload for index-range limit pushdown.
154/// Encodes the bounded fetch size after all eligibility gates pass.
155///
156
157struct IndexRangeLimitSpec {
158    fetch: usize,
159}
160
161///
162/// LoadExecutor
163///
164/// Load-plan executor with canonical post-access semantics.
165/// Coordinates fast paths, trace hooks, and pagination cursors.
166///
167
168#[derive(Clone)]
169pub(crate) struct LoadExecutor<E: EntityKind> {
170    db: Db<E::Canister>,
171    debug: bool,
172    _marker: PhantomData<E>,
173}
174
175impl<E> LoadExecutor<E>
176where
177    E: EntityKind + EntityValue,
178{
179    #[must_use]
180    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
181        Self {
182            db,
183            debug,
184            _marker: PhantomData,
185        }
186    }
187
188    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
189        self.execute_paged_with_cursor(plan, PlannedCursor::none())
190            .map(|page| page.items)
191    }
192
193    pub(in crate::db) fn execute_paged_with_cursor(
194        &self,
195        plan: ExecutablePlan<E>,
196        cursor: impl Into<PlannedCursor>,
197    ) -> Result<CursorPage<E>, InternalError> {
198        self.execute_paged_with_cursor_traced(plan, cursor)
199            .map(|(page, _)| page)
200    }
201
202    pub(in crate::db) fn execute_paged_with_cursor_traced(
203        &self,
204        plan: ExecutablePlan<E>,
205        cursor: impl Into<PlannedCursor>,
206    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
207        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
208        let cursor_boundary = cursor.boundary().cloned();
209        let index_range_anchor = cursor.index_range_anchor().cloned();
210
211        if !plan.mode().is_load() {
212            return Err(InternalError::query_executor_invariant(
213                "load executor requires load plans",
214            ));
215        }
216
217        let direction = plan.direction();
218        let continuation_signature = plan.continuation_signature();
219        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
220        let index_range_specs = plan.index_range_specs()?.to_vec();
221        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
222        let mut execution_trace = self
223            .debug
224            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
225
226        let result = (|| {
227            let mut span = Span::<E>::new(ExecKind::Load);
228            let plan = plan.into_inner();
229
230            validate_executor_plan::<E>(&plan)?;
231            let ctx = self.db.recovered_context::<E>()?;
232            let execution_inputs = ExecutionInputs {
233                ctx: &ctx,
234                plan: &plan,
235                index_prefix_specs: index_prefix_specs.as_slice(),
236                index_range_specs: index_range_specs.as_slice(),
237                index_range_anchor: index_range_anchor.as_ref(),
238                direction,
239            };
240
241            record_plan_metrics(&plan.access);
242            // Plan fast-path routing decisions once, then execute in canonical order.
243            let fast_path_plan = Self::build_fast_path_plan(
244                &plan,
245                cursor_boundary.as_ref(),
246                index_range_anchor.as_ref(),
247                None,
248            )?;
249
250            // Resolve one canonical key stream, then run shared page materialization/finalization.
251            let mut resolved =
252                Self::resolve_execution_key_stream(&execution_inputs, &fast_path_plan)?;
253            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
254                &ctx,
255                &plan,
256                resolved.key_stream.as_mut(),
257                cursor_boundary.as_ref(),
258                direction,
259                continuation_signature,
260            )?;
261            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
262
263            Ok(Self::finalize_execution(
264                page,
265                resolved.optimization,
266                rows_scanned,
267                post_access_rows,
268                &mut span,
269                &mut execution_trace,
270            ))
271        })();
272
273        result.map(|page| (page, execution_trace))
274    }
275
276    // Record shared observability outcome for any execution path.
277    fn finalize_path_outcome(
278        execution_trace: &mut Option<ExecutionTrace>,
279        optimization: Option<ExecutionOptimization>,
280        rows_scanned: usize,
281        rows_returned: usize,
282    ) {
283        record_rows_scanned::<E>(rows_scanned);
284        if let Some(execution_trace) = execution_trace.as_mut() {
285            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
286            debug_assert_eq!(
287                execution_trace.keys_scanned,
288                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
289                "execution trace keys_scanned must match rows_scanned metrics input",
290            );
291        }
292    }
293
294    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
295    fn validate_pk_fast_path_boundary_if_applicable(
296        plan: &LogicalPlan<E::Key>,
297        cursor_boundary: Option<&CursorBoundary>,
298    ) -> Result<(), InternalError> {
299        if !Self::is_pk_order_stream_eligible(plan) {
300            return Ok(());
301        }
302        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
303
304        Ok(())
305    }
306}