Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod execute;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod route;
7mod secondary_index;
8mod trace;
9
10use self::{
11    execute::ExecutionInputs,
12    trace::{access_path_variant, execution_order_direction},
13};
14use crate::{
15    db::{
16        Db,
17        executor::{
18            KeyOrderComparator, OrderedKeyStreamBox,
19            plan::{record_plan_metrics, record_rows_scanned},
20        },
21        query::plan::{
22            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
23            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
24            validate::validate_executor_plan,
25        },
26        response::Response,
27    },
28    error::InternalError,
29    obs::sink::{ExecKind, Span},
30    traits::{EntityKind, EntityValue},
31};
32use std::marker::PhantomData;
33
34///
35/// CursorPage
36///
37/// Internal load page result with continuation cursor payload.
38/// Returned by paged executor entrypoints.
39///
40
41#[derive(Debug)]
42pub(crate) struct CursorPage<E: EntityKind> {
43    pub(crate) items: Response<E>,
44
45    pub(crate) next_cursor: Option<Vec<u8>>,
46}
47
48///
49/// ExecutionAccessPathVariant
50///
51/// Coarse access path shape used by the load execution trace surface.
52///
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum ExecutionAccessPathVariant {
55    ByKey,
56    ByKeys,
57    KeyRange,
58    IndexPrefix,
59    IndexRange,
60    FullScan,
61    Union,
62    Intersection,
63}
64
65///
66/// ExecutionOptimization
67///
68/// Canonical load optimization selected by execution, if any.
69///
70
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72pub enum ExecutionOptimization {
73    PrimaryKey,
74    SecondaryOrderPushdown,
75    IndexRangeLimitPushdown,
76}
77
78///
79/// ExecutionTrace
80///
81/// Structured, opt-in load execution introspection snapshot.
82/// Captures plan-shape and execution decisions without changing semantics.
83///
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct ExecutionTrace {
87    pub access_path_variant: ExecutionAccessPathVariant,
88    pub direction: OrderDirection,
89    pub optimization: Option<ExecutionOptimization>,
90    pub keys_scanned: u64,
91    pub rows_returned: u64,
92    pub continuation_applied: bool,
93}
94
95impl ExecutionTrace {
96    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
97        Self {
98            access_path_variant: access_path_variant(access),
99            direction: execution_order_direction(direction),
100            optimization: None,
101            keys_scanned: 0,
102            rows_returned: 0,
103            continuation_applied,
104        }
105    }
106
107    fn set_path_outcome(
108        &mut self,
109        optimization: Option<ExecutionOptimization>,
110        keys_scanned: usize,
111        rows_returned: usize,
112    ) {
113        self.optimization = optimization;
114        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
115        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
116    }
117}
118
119fn key_stream_comparator_from_plan<K>(
120    plan: &LogicalPlan<K>,
121    fallback_direction: Direction,
122) -> KeyOrderComparator {
123    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
124        derive_scan_direction(order, SlotSelectionPolicy::Last)
125    });
126
127    // Comparator and child-stream monotonicity must stay aligned until access-path
128    // stream production can emit keys under an order-spec-derived comparator.
129    let comparator_direction = if derived_direction == fallback_direction {
130        derived_direction
131    } else {
132        fallback_direction
133    };
134
135    KeyOrderComparator::from_direction(comparator_direction)
136}
137
138///
139/// FastPathKeyResult
140///
141/// Internal fast-path access result.
142/// Carries ordered keys plus observability metadata for shared execution phases.
143///
144struct FastPathKeyResult {
145    ordered_key_stream: OrderedKeyStreamBox,
146    rows_scanned: usize,
147    optimization: ExecutionOptimization,
148}
149
150///
151/// IndexRangeLimitSpec
152///
153/// Canonical executor decision payload for index-range limit pushdown.
154/// Encodes the bounded fetch size after all eligibility gates pass.
155///
156
157struct IndexRangeLimitSpec {
158    fetch: usize,
159}
160
161///
162/// LoadExecutor
163///
164/// Load-plan executor with canonical post-access semantics.
165/// Coordinates fast paths, trace hooks, and pagination cursors.
166///
167
168#[derive(Clone)]
169pub(crate) struct LoadExecutor<E: EntityKind> {
170    db: Db<E::Canister>,
171    debug: bool,
172    _marker: PhantomData<E>,
173}
174
175impl<E> LoadExecutor<E>
176where
177    E: EntityKind + EntityValue,
178{
179    #[must_use]
180    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
181        Self {
182            db,
183            debug,
184            _marker: PhantomData,
185        }
186    }
187
188    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
189        self.execute_paged_with_cursor(plan, PlannedCursor::none())
190            .map(|page| page.items)
191    }
192
193    pub(in crate::db) fn execute_paged_with_cursor(
194        &self,
195        plan: ExecutablePlan<E>,
196        cursor: impl Into<PlannedCursor>,
197    ) -> Result<CursorPage<E>, InternalError> {
198        self.execute_paged_with_cursor_traced(plan, cursor)
199            .map(|(page, _)| page)
200    }
201
202    pub(in crate::db) fn execute_paged_with_cursor_traced(
203        &self,
204        plan: ExecutablePlan<E>,
205        cursor: impl Into<PlannedCursor>,
206    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
207        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
208        let cursor_boundary = cursor.boundary().cloned();
209        let index_range_anchor = cursor.index_range_anchor().cloned();
210
211        if !plan.mode().is_load() {
212            return Err(InternalError::query_executor_invariant(
213                "load executor requires load plans",
214            ));
215        }
216
217        let direction = plan.direction();
218        let continuation_signature = plan.continuation_signature();
219        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
220        let mut execution_trace = self
221            .debug
222            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
223
224        let result = (|| {
225            let mut span = Span::<E>::new(ExecKind::Load);
226            let plan = plan.into_inner();
227
228            validate_executor_plan::<E>(&plan)?;
229            let ctx = self.db.recovered_context::<E>()?;
230            let execution_inputs = ExecutionInputs {
231                ctx: &ctx,
232                plan: &plan,
233                index_range_anchor: index_range_anchor.as_ref(),
234                direction,
235            };
236
237            record_plan_metrics(&plan.access);
238            // Plan fast-path routing decisions once, then execute in canonical order.
239            let fast_path_plan = Self::build_fast_path_plan(
240                &plan,
241                cursor_boundary.as_ref(),
242                index_range_anchor.as_ref(),
243                None,
244            )?;
245
246            // Resolve one canonical key stream, then run shared page materialization/finalization.
247            let mut resolved =
248                Self::resolve_execution_key_stream(&execution_inputs, &fast_path_plan)?;
249            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
250                &ctx,
251                &plan,
252                resolved.key_stream.as_mut(),
253                cursor_boundary.as_ref(),
254                direction,
255                continuation_signature,
256            )?;
257            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
258
259            Ok(Self::finalize_execution(
260                page,
261                resolved.optimization,
262                rows_scanned,
263                post_access_rows,
264                &mut span,
265                &mut execution_trace,
266            ))
267        })();
268
269        result.map(|page| (page, execution_trace))
270    }
271
272    // Record shared observability outcome for any execution path.
273    fn finalize_path_outcome(
274        execution_trace: &mut Option<ExecutionTrace>,
275        optimization: Option<ExecutionOptimization>,
276        rows_scanned: usize,
277        rows_returned: usize,
278    ) {
279        record_rows_scanned::<E>(rows_scanned);
280        if let Some(execution_trace) = execution_trace.as_mut() {
281            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
282            debug_assert_eq!(
283                execution_trace.keys_scanned,
284                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
285                "execution trace keys_scanned must match rows_scanned metrics input",
286            );
287        }
288    }
289
290    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
291    fn validate_pk_fast_path_boundary_if_applicable(
292        plan: &LogicalPlan<E::Key>,
293        cursor_boundary: Option<&CursorBoundary>,
294    ) -> Result<(), InternalError> {
295        if !Self::is_pk_order_stream_eligible(plan) {
296            return Ok(());
297        }
298        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
299
300        Ok(())
301    }
302}