Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13    execute::ExecutionInputs,
14    trace::{access_path_variant, execution_order_direction},
15};
16use crate::{
17    db::{
18        Db,
19        executor::{
20            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
21            plan::{record_plan_metrics, record_rows_scanned},
22        },
23        query::plan::{
24            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
25            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
26            validate::validate_executor_plan,
27        },
28        response::Response,
29    },
30    error::InternalError,
31    obs::sink::{ExecKind, Span},
32    traits::{EntityKind, EntityValue},
33};
34use std::marker::PhantomData;
35
36///
37/// CursorPage
38///
39/// Internal load page result with continuation cursor payload.
40/// Returned by paged executor entrypoints.
41///
42
43#[derive(Debug)]
44pub(crate) struct CursorPage<E: EntityKind> {
45    pub(crate) items: Response<E>,
46
47    pub(crate) next_cursor: Option<Vec<u8>>,
48}
49
50///
51/// ExecutionAccessPathVariant
52///
53/// Coarse access path shape used by the load execution trace surface.
54///
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57    ByKey,
58    ByKeys,
59    KeyRange,
60    IndexPrefix,
61    IndexRange,
62    FullScan,
63    Union,
64    Intersection,
65}
66
67///
68/// ExecutionOptimization
69///
70/// Canonical load optimization selected by execution, if any.
71///
72
73#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75    PrimaryKey,
76    SecondaryOrderPushdown,
77    IndexRangeLimitPushdown,
78}
79
80///
81/// ExecutionTrace
82///
83/// Structured, opt-in load execution introspection snapshot.
84/// Captures plan-shape and execution decisions without changing semantics.
85///
86
87#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89    pub access_path_variant: ExecutionAccessPathVariant,
90    pub direction: OrderDirection,
91    pub optimization: Option<ExecutionOptimization>,
92    pub keys_scanned: u64,
93    pub rows_returned: u64,
94    pub continuation_applied: bool,
95    pub index_predicate_applied: bool,
96    pub index_predicate_keys_rejected: u64,
97    pub distinct_keys_deduped: u64,
98}
99
100impl ExecutionTrace {
101    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
102        Self {
103            access_path_variant: access_path_variant(access),
104            direction: execution_order_direction(direction),
105            optimization: None,
106            keys_scanned: 0,
107            rows_returned: 0,
108            continuation_applied,
109            index_predicate_applied: false,
110            index_predicate_keys_rejected: 0,
111            distinct_keys_deduped: 0,
112        }
113    }
114
115    fn set_path_outcome(
116        &mut self,
117        optimization: Option<ExecutionOptimization>,
118        keys_scanned: usize,
119        rows_returned: usize,
120        index_predicate_applied: bool,
121        index_predicate_keys_rejected: u64,
122        distinct_keys_deduped: u64,
123    ) {
124        self.optimization = optimization;
125        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
126        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
127        self.index_predicate_applied = index_predicate_applied;
128        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
129        self.distinct_keys_deduped = distinct_keys_deduped;
130    }
131}
132
133fn key_stream_comparator_from_plan<K>(
134    plan: &LogicalPlan<K>,
135    fallback_direction: Direction,
136) -> KeyOrderComparator {
137    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
138        derive_scan_direction(order, SlotSelectionPolicy::Last)
139    });
140
141    // Comparator and child-stream monotonicity must stay aligned until access-path
142    // stream production can emit keys under an order-spec-derived comparator.
143    let comparator_direction = if derived_direction == fallback_direction {
144        derived_direction
145    } else {
146        fallback_direction
147    };
148
149    KeyOrderComparator::from_direction(comparator_direction)
150}
151
152///
153/// FastPathKeyResult
154///
155/// Internal fast-path access result.
156/// Carries ordered keys plus observability metadata for shared execution phases.
157///
158struct FastPathKeyResult {
159    ordered_key_stream: OrderedKeyStreamBox,
160    rows_scanned: usize,
161    optimization: ExecutionOptimization,
162}
163
164///
165/// IndexRangeLimitSpec
166///
167/// Canonical executor decision payload for index-range limit pushdown.
168/// Encodes the bounded fetch size after all eligibility gates pass.
169///
170
171#[derive(Clone, Copy, Debug, Eq, PartialEq)]
172struct IndexRangeLimitSpec {
173    fetch: usize,
174}
175
176///
177/// LoadExecutor
178///
179/// Load-plan executor with canonical post-access semantics.
180/// Coordinates fast paths, trace hooks, and pagination cursors.
181///
182
183#[derive(Clone)]
184pub(crate) struct LoadExecutor<E: EntityKind> {
185    db: Db<E::Canister>,
186    debug: bool,
187    _marker: PhantomData<E>,
188}
189
190impl<E> LoadExecutor<E>
191where
192    E: EntityKind + EntityValue,
193{
194    #[must_use]
195    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
196        Self {
197            db,
198            debug,
199            _marker: PhantomData,
200        }
201    }
202
203    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
204        self.execute_paged_with_cursor(plan, PlannedCursor::none())
205            .map(|page| page.items)
206    }
207
208    pub(in crate::db) fn execute_paged_with_cursor(
209        &self,
210        plan: ExecutablePlan<E>,
211        cursor: impl Into<PlannedCursor>,
212    ) -> Result<CursorPage<E>, InternalError> {
213        self.execute_paged_with_cursor_traced(plan, cursor)
214            .map(|(page, _)| page)
215    }
216
217    pub(in crate::db) fn execute_paged_with_cursor_traced(
218        &self,
219        plan: ExecutablePlan<E>,
220        cursor: impl Into<PlannedCursor>,
221    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
222        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
223        let cursor_boundary = cursor.boundary().cloned();
224        let index_range_anchor = cursor.index_range_anchor().cloned();
225
226        if !plan.mode().is_load() {
227            return Err(InternalError::query_executor_invariant(
228                "load executor requires load plans",
229            ));
230        }
231
232        let direction = plan.direction();
233        let continuation_signature = plan.continuation_signature();
234        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
235        let index_range_specs = plan.index_range_specs()?.to_vec();
236        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
237        let mut execution_trace = self
238            .debug
239            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
240        let (plan, predicate_slots) = plan.into_parts();
241
242        let result = (|| {
243            let mut span = Span::<E>::new(ExecKind::Load);
244
245            validate_executor_plan::<E>(&plan)?;
246            let ctx = self.db.recovered_context::<E>()?;
247            let execution_inputs = ExecutionInputs {
248                ctx: &ctx,
249                plan: &plan,
250                stream_bindings: AccessStreamBindings {
251                    index_prefix_specs: index_prefix_specs.as_slice(),
252                    index_range_specs: index_range_specs.as_slice(),
253                    index_range_anchor: index_range_anchor.as_ref(),
254                    direction,
255                },
256                predicate_slots: predicate_slots.as_ref(),
257            };
258
259            record_plan_metrics(&plan.access);
260            // Plan execution routing once, then execute in canonical order.
261            let route_plan = Self::build_execution_route_plan_for_load(
262                &plan,
263                cursor_boundary.as_ref(),
264                index_range_anchor.as_ref(),
265                None,
266                direction,
267            )?;
268
269            // Resolve one canonical key stream, then run shared page materialization/finalization.
270            let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
271            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
272                &ctx,
273                &plan,
274                predicate_slots.as_ref(),
275                resolved.key_stream.as_mut(),
276                route_plan.scan_hints.load_scan_budget_hint,
277                route_plan.streaming_access_shape_safe(),
278                cursor_boundary.as_ref(),
279                direction,
280                continuation_signature,
281            )?;
282            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
283            let distinct_keys_deduped = resolved
284                .distinct_keys_deduped_counter
285                .as_ref()
286                .map_or(0, |counter| counter.get());
287
288            Ok(Self::finalize_execution(
289                page,
290                resolved.optimization,
291                rows_scanned,
292                post_access_rows,
293                resolved.index_predicate_applied,
294                resolved.index_predicate_keys_rejected,
295                distinct_keys_deduped,
296                &mut span,
297                &mut execution_trace,
298            ))
299        })();
300
301        result.map(|page| (page, execution_trace))
302    }
303
304    // Record shared observability outcome for any execution path.
305    fn finalize_path_outcome(
306        execution_trace: &mut Option<ExecutionTrace>,
307        optimization: Option<ExecutionOptimization>,
308        rows_scanned: usize,
309        rows_returned: usize,
310        index_predicate_applied: bool,
311        index_predicate_keys_rejected: u64,
312        distinct_keys_deduped: u64,
313    ) {
314        record_rows_scanned::<E>(rows_scanned);
315        if let Some(execution_trace) = execution_trace.as_mut() {
316            execution_trace.set_path_outcome(
317                optimization,
318                rows_scanned,
319                rows_returned,
320                index_predicate_applied,
321                index_predicate_keys_rejected,
322                distinct_keys_deduped,
323            );
324            debug_assert_eq!(
325                execution_trace.keys_scanned,
326                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
327                "execution trace keys_scanned must match rows_scanned metrics input",
328            );
329        }
330    }
331
332    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
333    fn validate_pk_fast_path_boundary_if_applicable(
334        plan: &LogicalPlan<E::Key>,
335        cursor_boundary: Option<&CursorBoundary>,
336    ) -> Result<(), InternalError> {
337        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
338            return Ok(());
339        }
340        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
341
342        Ok(())
343    }
344}