Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13    execute::ExecutionInputs,
14    trace::{access_path_variant, execution_order_direction},
15};
16use crate::{
17    db::{
18        Db,
19        executor::{
20            AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
21            plan::{record_plan_metrics, record_rows_scanned},
22        },
23        query::plan::{
24            AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
25            PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
26            validate::validate_executor_plan,
27        },
28        response::Response,
29    },
30    error::InternalError,
31    obs::sink::{ExecKind, Span},
32    traits::{EntityKind, EntityValue},
33};
34use std::marker::PhantomData;
35
36///
37/// CursorPage
38///
39/// Internal load page result with continuation cursor payload.
40/// Returned by paged executor entrypoints.
41///
42
43#[derive(Debug)]
44pub(crate) struct CursorPage<E: EntityKind> {
45    pub(crate) items: Response<E>,
46
47    pub(crate) next_cursor: Option<Vec<u8>>,
48}
49
50///
51/// ExecutionAccessPathVariant
52///
53/// Coarse access path shape used by the load execution trace surface.
54///
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57    ByKey,
58    ByKeys,
59    KeyRange,
60    IndexPrefix,
61    IndexRange,
62    FullScan,
63    Union,
64    Intersection,
65}
66
67///
68/// ExecutionOptimization
69///
70/// Canonical load optimization selected by execution, if any.
71///
72
73#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75    PrimaryKey,
76    SecondaryOrderPushdown,
77    IndexRangeLimitPushdown,
78}
79
80///
81/// ExecutionTrace
82///
83/// Structured, opt-in load execution introspection snapshot.
84/// Captures plan-shape and execution decisions without changing semantics.
85///
86
87#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89    pub access_path_variant: ExecutionAccessPathVariant,
90    pub direction: OrderDirection,
91    pub optimization: Option<ExecutionOptimization>,
92    pub keys_scanned: u64,
93    pub rows_returned: u64,
94    pub continuation_applied: bool,
95}
96
97impl ExecutionTrace {
98    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
99        Self {
100            access_path_variant: access_path_variant(access),
101            direction: execution_order_direction(direction),
102            optimization: None,
103            keys_scanned: 0,
104            rows_returned: 0,
105            continuation_applied,
106        }
107    }
108
109    fn set_path_outcome(
110        &mut self,
111        optimization: Option<ExecutionOptimization>,
112        keys_scanned: usize,
113        rows_returned: usize,
114    ) {
115        self.optimization = optimization;
116        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
117        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
118    }
119}
120
121fn key_stream_comparator_from_plan<K>(
122    plan: &LogicalPlan<K>,
123    fallback_direction: Direction,
124) -> KeyOrderComparator {
125    let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
126        derive_scan_direction(order, SlotSelectionPolicy::Last)
127    });
128
129    // Comparator and child-stream monotonicity must stay aligned until access-path
130    // stream production can emit keys under an order-spec-derived comparator.
131    let comparator_direction = if derived_direction == fallback_direction {
132        derived_direction
133    } else {
134        fallback_direction
135    };
136
137    KeyOrderComparator::from_direction(comparator_direction)
138}
139
140///
141/// FastPathKeyResult
142///
143/// Internal fast-path access result.
144/// Carries ordered keys plus observability metadata for shared execution phases.
145///
146struct FastPathKeyResult {
147    ordered_key_stream: OrderedKeyStreamBox,
148    rows_scanned: usize,
149    optimization: ExecutionOptimization,
150}
151
152///
153/// IndexRangeLimitSpec
154///
155/// Canonical executor decision payload for index-range limit pushdown.
156/// Encodes the bounded fetch size after all eligibility gates pass.
157///
158
159#[derive(Clone, Copy, Debug, Eq, PartialEq)]
160struct IndexRangeLimitSpec {
161    fetch: usize,
162}
163
164///
165/// LoadExecutor
166///
167/// Load-plan executor with canonical post-access semantics.
168/// Coordinates fast paths, trace hooks, and pagination cursors.
169///
170
171#[derive(Clone)]
172pub(crate) struct LoadExecutor<E: EntityKind> {
173    db: Db<E::Canister>,
174    debug: bool,
175    _marker: PhantomData<E>,
176}
177
178impl<E> LoadExecutor<E>
179where
180    E: EntityKind + EntityValue,
181{
182    #[must_use]
183    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
184        Self {
185            db,
186            debug,
187            _marker: PhantomData,
188        }
189    }
190
191    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
192        self.execute_paged_with_cursor(plan, PlannedCursor::none())
193            .map(|page| page.items)
194    }
195
196    pub(in crate::db) fn execute_paged_with_cursor(
197        &self,
198        plan: ExecutablePlan<E>,
199        cursor: impl Into<PlannedCursor>,
200    ) -> Result<CursorPage<E>, InternalError> {
201        self.execute_paged_with_cursor_traced(plan, cursor)
202            .map(|(page, _)| page)
203    }
204
205    pub(in crate::db) fn execute_paged_with_cursor_traced(
206        &self,
207        plan: ExecutablePlan<E>,
208        cursor: impl Into<PlannedCursor>,
209    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
210        let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
211        let cursor_boundary = cursor.boundary().cloned();
212        let index_range_anchor = cursor.index_range_anchor().cloned();
213
214        if !plan.mode().is_load() {
215            return Err(InternalError::query_executor_invariant(
216                "load executor requires load plans",
217            ));
218        }
219
220        let direction = plan.direction();
221        let continuation_signature = plan.continuation_signature();
222        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
223        let index_range_specs = plan.index_range_specs()?.to_vec();
224        let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
225        let mut execution_trace = self
226            .debug
227            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
228
229        let result = (|| {
230            let mut span = Span::<E>::new(ExecKind::Load);
231            let plan = plan.into_inner();
232
233            validate_executor_plan::<E>(&plan)?;
234            let ctx = self.db.recovered_context::<E>()?;
235            let execution_inputs = ExecutionInputs {
236                ctx: &ctx,
237                plan: &plan,
238                stream_bindings: AccessStreamBindings {
239                    index_prefix_specs: index_prefix_specs.as_slice(),
240                    index_range_specs: index_range_specs.as_slice(),
241                    index_range_anchor: index_range_anchor.as_ref(),
242                    direction,
243                },
244            };
245
246            record_plan_metrics(&plan.access);
247            // Plan execution routing once, then execute in canonical order.
248            let route_plan = Self::build_execution_route_plan_for_load(
249                &plan,
250                cursor_boundary.as_ref(),
251                index_range_anchor.as_ref(),
252                None,
253                direction,
254            )?;
255
256            // Resolve one canonical key stream, then run shared page materialization/finalization.
257            let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
258            let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
259                &ctx,
260                &plan,
261                resolved.key_stream.as_mut(),
262                route_plan.scan_hints.load_scan_budget_hint,
263                route_plan.streaming_access_shape_safe(),
264                cursor_boundary.as_ref(),
265                direction,
266                continuation_signature,
267            )?;
268            let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
269
270            Ok(Self::finalize_execution(
271                page,
272                resolved.optimization,
273                rows_scanned,
274                post_access_rows,
275                &mut span,
276                &mut execution_trace,
277            ))
278        })();
279
280        result.map(|page| (page, execution_trace))
281    }
282
283    // Record shared observability outcome for any execution path.
284    fn finalize_path_outcome(
285        execution_trace: &mut Option<ExecutionTrace>,
286        optimization: Option<ExecutionOptimization>,
287        rows_scanned: usize,
288        rows_returned: usize,
289    ) {
290        record_rows_scanned::<E>(rows_scanned);
291        if let Some(execution_trace) = execution_trace.as_mut() {
292            execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
293            debug_assert_eq!(
294                execution_trace.keys_scanned,
295                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
296                "execution trace keys_scanned must match rows_scanned metrics input",
297            );
298        }
299    }
300
301    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
302    fn validate_pk_fast_path_boundary_if_applicable(
303        plan: &LogicalPlan<E::Key>,
304        cursor_boundary: Option<&CursorBoundary>,
305    ) -> Result<(), InternalError> {
306        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
307            return Ok(());
308        }
309        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
310
311        Ok(())
312    }
313}