Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_guard;
3mod execute;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod route;
8mod secondary_index;
9mod trace;
10
11use self::{
12    execute::ExecutionInputs,
13    trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16    db::{
17        Db,
18        executor::{
19            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20            plan::{record_plan_metrics, record_rows_scanned},
21        },
22        query::plan::{
23            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
24            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
25            validate::validate_executor_plan,
26        },
27        response::Response,
28    },
29    error::InternalError,
30    obs::sink::{ExecKind, Span},
31    traits::{EntityKind, EntityValue},
32};
33use std::marker::PhantomData;
34
35///
36/// CursorPage
37///
38/// Internal load page result with continuation cursor payload.
39/// Returned by paged executor entrypoints.
40///
41
42#[derive(Debug)]
43pub(crate) struct CursorPage<E: EntityKind> {
44    pub(crate) items: Response<E>,
45
46    pub(crate) next_cursor: Option<Vec<u8>>,
47}
48
49///
50/// ExecutionAccessPathVariant
51///
52/// Coarse access path shape used by the load execution trace surface.
53///
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub enum ExecutionAccessPathVariant {
56    ByKey,
57    ByKeys,
58    KeyRange,
59    IndexPrefix,
60    IndexRange,
61    FullScan,
62    Union,
63    Intersection,
64}
65
66///
67/// ExecutionOptimization
68///
69/// Canonical load optimization selected by execution, if any.
70///
71
72#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub enum ExecutionOptimization {
74    PrimaryKey,
75    SecondaryOrderPushdown,
76    IndexRangeLimitPushdown,
77}
78
79///
80/// ExecutionTrace
81///
82/// Structured, opt-in load execution introspection snapshot.
83/// Captures plan-shape and execution decisions without changing semantics.
84///
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub struct ExecutionTrace {
88    pub access_path_variant: ExecutionAccessPathVariant,
89    pub direction: OrderDirection,
90    pub optimization: Option<ExecutionOptimization>,
91    pub keys_scanned: u64,
92    pub rows_returned: u64,
93    pub continuation_applied: bool,
94}
95
96impl ExecutionTrace {
97    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
98        Self {
99            access_path_variant: access_path_variant(access),
100            direction: execution_order_direction(direction),
101            optimization: None,
102            keys_scanned: 0,
103            rows_returned: 0,
104            continuation_applied,
105        }
106    }
107
108    fn set_path_outcome(
109        &mut self,
110        optimization: Option<ExecutionOptimization>,
111        keys_scanned: usize,
112        rows_returned: usize,
113    ) {
114        self.optimization = optimization;
115        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
116        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
117    }
118}
119
120fn key_stream_comparator_from_plan<K>(
121    plan: &LogicalPlan<K>,
122    fallback_direction: Direction,
123) -> KeyOrderComparator {
124    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
125        derive_scan_direction(order, SlotSelectionPolicy::Last)
126    });
127
128    // Comparator and child-stream monotonicity must stay aligned until access-path
129    // stream production can emit keys under an order-spec-derived comparator.
130    let comparator_direction = if derived_direction == fallback_direction {
131        derived_direction
132    } else {
133        fallback_direction
134    };
135
136    KeyOrderComparator::from_direction(comparator_direction)
137}
138
139///
140/// FastPathKeyResult
141///
142/// Internal fast-path access result.
143/// Carries ordered keys plus observability metadata for shared execution phases.
144///
145struct FastPathKeyResult {
146    ordered_key_stream: OrderedKeyStreamBox,
147    rows_scanned: usize,
148    optimization: ExecutionOptimization,
149}
150
151///
152/// IndexRangeLimitSpec
153///
154/// Canonical executor decision payload for index-range limit pushdown.
155/// Encodes the bounded fetch size after all eligibility gates pass.
156///
157
158struct IndexRangeLimitSpec {
159    fetch: usize,
160}
161
162///
163/// LoadExecutor
164///
165/// Load-plan executor with canonical post-access semantics.
166/// Coordinates fast paths, trace hooks, and pagination cursors.
167///
168
169#[derive(Clone)]
170pub(crate) struct LoadExecutor<E: EntityKind> {
171    db: Db<E::Canister>,
172    debug: bool,
173    _marker: PhantomData<E>,
174}
175
176impl<E> LoadExecutor<E>
177where
178    E: EntityKind + EntityValue,
179{
180    #[must_use]
181    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
182        Self {
183            db,
184            debug,
185            _marker: PhantomData,
186        }
187    }
188
189    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
190        self.execute_paged_with_cursor(plan, PlannedCursor::none())
191            .map(|page| page.items)
192    }
193
194    pub(in crate::db) fn execute_paged_with_cursor(
195        &self,
196        plan: ExecutablePlan<E>,
197        cursor: impl Into<PlannedCursor>,
198    ) -> Result<CursorPage<E>, InternalError> {
199        self.execute_paged_with_cursor_traced(plan, cursor)
200            .map(|(page, _)| page)
201    }
202
203    pub(in crate::db) fn execute_paged_with_cursor_traced(
204        &self,
205        plan: ExecutablePlan<E>,
206        cursor: impl Into<PlannedCursor>,
207    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
208        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
209        let cursor_boundary = cursor.boundary().cloned();
210        let index_range_anchor = cursor.index_range_anchor().cloned();
211
212        if !plan.mode().is_load() {
213            return Err(InternalError::query_executor_invariant(
214                "load executor requires load plans",
215            ));
216        }
217
218        let direction = plan.direction();
219        let continuation_signature = plan.continuation_signature();
220        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
221        let index_range_specs = plan.index_range_specs()?.to_vec();
222        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
223        let mut execution_trace = self
224            .debug
225            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
226
227        let result = (|| {
228            let mut span = Span::<E>::new(ExecKind::Load);
229            let plan = plan.into_inner();
230
231            validate_executor_plan::<E>(&plan)?;
232            let ctx = self.db.recovered_context::<E>()?;
233            let execution_inputs = ExecutionInputs {
234                ctx: &ctx,
235                plan: &plan,
236                stream_bindings: AccessStreamBindings {
237                    index_prefix_specs: index_prefix_specs.as_slice(),
238                    index_range_specs: index_range_specs.as_slice(),
239                    index_range_anchor: index_range_anchor.as_ref(),
240                    direction,
241                },
242            };
243
244            record_plan_metrics(&plan.access);
245            // Plan fast-path routing decisions once, then execute in canonical order.
246            let fast_path_plan = Self::build_fast_path_plan(
247                &plan,
248                cursor_boundary.as_ref(),
249                index_range_anchor.as_ref(),
250                None,
251            )?;
252
253            // Resolve one canonical key stream, then run shared page materialization/finalization.
254            let mut resolved =
255                Self::resolve_execution_key_stream(&execution_inputs, &fast_path_plan)?;
256            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
257                &ctx,
258                &plan,
259                resolved.key_stream.as_mut(),
260                cursor_boundary.as_ref(),
261                direction,
262                continuation_signature,
263            )?;
264            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
265
266            Ok(Self::finalize_execution(
267                page,
268                resolved.optimization,
269                rows_scanned,
270                post_access_rows,
271                &mut span,
272                &mut execution_trace,
273            ))
274        })();
275
276        result.map(|page| (page, execution_trace))
277    }
278
279    // Record shared observability outcome for any execution path.
280    fn finalize_path_outcome(
281        execution_trace: &mut Option<ExecutionTrace>,
282        optimization: Option<ExecutionOptimization>,
283        rows_scanned: usize,
284        rows_returned: usize,
285    ) {
286        record_rows_scanned::<E>(rows_scanned);
287        if let Some(execution_trace) = execution_trace.as_mut() {
288            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
289            debug_assert_eq!(
290                execution_trace.keys_scanned,
291                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
292                "execution trace keys_scanned must match rows_scanned metrics input",
293            );
294        }
295    }
296
297    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
298    fn validate_pk_fast_path_boundary_if_applicable(
299        plan: &LogicalPlan<E::Key>,
300        cursor_boundary: Option<&CursorBoundary>,
301    ) -> Result<(), InternalError> {
302        if !Self::is_pk_order_stream_eligible(plan) {
303            return Ok(());
304        }
305        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
306
307        Ok(())
308    }
309}