icydb_core/db/executor/load/
mod.rs1mod aggregate;
2mod aggregate_guard;
3mod execute;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod route;
8mod secondary_index;
9mod trace;
10
11use self::{
12 execute::ExecutionInputs,
13 trace::{access_path_variant, execution_order_direction},
14};
15use crate::{
16 db::{
17 Db,
18 executor::{
19 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
20 plan::{record_plan_metrics, record_rows_scanned},
21 },
22 query::plan::{
23 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
24 PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
25 validate::validate_executor_plan,
26 },
27 response::Response,
28 },
29 error::InternalError,
30 obs::sink::{ExecKind, Span},
31 traits::{EntityKind, EntityValue},
32};
33use std::marker::PhantomData;
34
35#[derive(Debug)]
43pub(crate) struct CursorPage<E: EntityKind> {
44 pub(crate) items: Response<E>,
45
46 pub(crate) next_cursor: Option<Vec<u8>>,
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub enum ExecutionAccessPathVariant {
56 ByKey,
57 ByKeys,
58 KeyRange,
59 IndexPrefix,
60 IndexRange,
61 FullScan,
62 Union,
63 Intersection,
64}
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub enum ExecutionOptimization {
74 PrimaryKey,
75 SecondaryOrderPushdown,
76 IndexRangeLimitPushdown,
77}
78
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub struct ExecutionTrace {
88 pub access_path_variant: ExecutionAccessPathVariant,
89 pub direction: OrderDirection,
90 pub optimization: Option<ExecutionOptimization>,
91 pub keys_scanned: u64,
92 pub rows_returned: u64,
93 pub continuation_applied: bool,
94}
95
96impl ExecutionTrace {
97 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
98 Self {
99 access_path_variant: access_path_variant(access),
100 direction: execution_order_direction(direction),
101 optimization: None,
102 keys_scanned: 0,
103 rows_returned: 0,
104 continuation_applied,
105 }
106 }
107
108 fn set_path_outcome(
109 &mut self,
110 optimization: Option<ExecutionOptimization>,
111 keys_scanned: usize,
112 rows_returned: usize,
113 ) {
114 self.optimization = optimization;
115 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
116 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
117 }
118}
119
120fn key_stream_comparator_from_plan<K>(
121 plan: &LogicalPlan<K>,
122 fallback_direction: Direction,
123) -> KeyOrderComparator {
124 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
125 derive_scan_direction(order, SlotSelectionPolicy::Last)
126 });
127
128 let comparator_direction = if derived_direction == fallback_direction {
131 derived_direction
132 } else {
133 fallback_direction
134 };
135
136 KeyOrderComparator::from_direction(comparator_direction)
137}
138
139struct FastPathKeyResult {
146 ordered_key_stream: OrderedKeyStreamBox,
147 rows_scanned: usize,
148 optimization: ExecutionOptimization,
149}
150
151#[derive(Clone, Copy, Debug, Eq, PartialEq)]
159struct IndexRangeLimitSpec {
160 fetch: usize,
161}
162
163#[derive(Clone)]
171pub(crate) struct LoadExecutor<E: EntityKind> {
172 db: Db<E::Canister>,
173 debug: bool,
174 _marker: PhantomData<E>,
175}
176
177impl<E> LoadExecutor<E>
178where
179 E: EntityKind + EntityValue,
180{
181 #[must_use]
182 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
183 Self {
184 db,
185 debug,
186 _marker: PhantomData,
187 }
188 }
189
190 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
191 self.execute_paged_with_cursor(plan, PlannedCursor::none())
192 .map(|page| page.items)
193 }
194
195 pub(in crate::db) fn execute_paged_with_cursor(
196 &self,
197 plan: ExecutablePlan<E>,
198 cursor: impl Into<PlannedCursor>,
199 ) -> Result<CursorPage<E>, InternalError> {
200 self.execute_paged_with_cursor_traced(plan, cursor)
201 .map(|(page, _)| page)
202 }
203
204 pub(in crate::db) fn execute_paged_with_cursor_traced(
205 &self,
206 plan: ExecutablePlan<E>,
207 cursor: impl Into<PlannedCursor>,
208 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
209 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
210 let cursor_boundary = cursor.boundary().cloned();
211 let index_range_anchor = cursor.index_range_anchor().cloned();
212
213 if !plan.mode().is_load() {
214 return Err(InternalError::query_executor_invariant(
215 "load executor requires load plans",
216 ));
217 }
218
219 let direction = plan.direction();
220 let continuation_signature = plan.continuation_signature();
221 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
222 let index_range_specs = plan.index_range_specs()?.to_vec();
223 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
224 let mut execution_trace = self
225 .debug
226 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
227
228 let result = (|| {
229 let mut span = Span::<E>::new(ExecKind::Load);
230 let plan = plan.into_inner();
231
232 validate_executor_plan::<E>(&plan)?;
233 let ctx = self.db.recovered_context::<E>()?;
234 let execution_inputs = ExecutionInputs {
235 ctx: &ctx,
236 plan: &plan,
237 stream_bindings: AccessStreamBindings {
238 index_prefix_specs: index_prefix_specs.as_slice(),
239 index_range_specs: index_range_specs.as_slice(),
240 index_range_anchor: index_range_anchor.as_ref(),
241 direction,
242 },
243 };
244
245 record_plan_metrics(&plan.access);
246 let route_plan = Self::build_execution_route_plan_for_load(
248 &plan,
249 cursor_boundary.as_ref(),
250 index_range_anchor.as_ref(),
251 None,
252 direction,
253 )?;
254
255 let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
257 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
258 &ctx,
259 &plan,
260 resolved.key_stream.as_mut(),
261 route_plan.scan_hints.load_scan_budget_hint,
262 cursor_boundary.as_ref(),
263 direction,
264 continuation_signature,
265 )?;
266 let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
267
268 Ok(Self::finalize_execution(
269 page,
270 resolved.optimization,
271 rows_scanned,
272 post_access_rows,
273 &mut span,
274 &mut execution_trace,
275 ))
276 })();
277
278 result.map(|page| (page, execution_trace))
279 }
280
281 fn finalize_path_outcome(
283 execution_trace: &mut Option<ExecutionTrace>,
284 optimization: Option<ExecutionOptimization>,
285 rows_scanned: usize,
286 rows_returned: usize,
287 ) {
288 record_rows_scanned::<E>(rows_scanned);
289 if let Some(execution_trace) = execution_trace.as_mut() {
290 execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
291 debug_assert_eq!(
292 execution_trace.keys_scanned,
293 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
294 "execution trace keys_scanned must match rows_scanned metrics input",
295 );
296 }
297 }
298
299 fn validate_pk_fast_path_boundary_if_applicable(
301 plan: &LogicalPlan<E::Key>,
302 cursor_boundary: Option<&CursorBoundary>,
303 ) -> Result<(), InternalError> {
304 if !Self::is_pk_order_stream_eligible(plan) {
305 return Ok(());
306 }
307 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
308
309 Ok(())
310 }
311}