icydb_core/db/executor/load/
mod.rs1mod aggregate;
2mod execute;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod route;
7mod secondary_index;
8mod trace;
9
10use self::{
11 execute::ExecutionInputs,
12 trace::{access_path_variant, execution_order_direction},
13};
14use crate::{
15 db::{
16 Db,
17 executor::{
18 KeyOrderComparator, OrderedKeyStreamBox,
19 plan::{record_plan_metrics, record_rows_scanned},
20 },
21 query::plan::{
22 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
23 PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
24 validate::validate_executor_plan,
25 },
26 response::Response,
27 },
28 error::InternalError,
29 obs::sink::{ExecKind, Span},
30 traits::{EntityKind, EntityValue},
31};
32use std::marker::PhantomData;
33
34#[derive(Debug)]
42pub(crate) struct CursorPage<E: EntityKind> {
43 pub(crate) items: Response<E>,
44
45 pub(crate) next_cursor: Option<Vec<u8>>,
46}
47
48#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum ExecutionAccessPathVariant {
55 ByKey,
56 ByKeys,
57 KeyRange,
58 IndexPrefix,
59 IndexRange,
60 FullScan,
61 Union,
62 Intersection,
63}
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72pub enum ExecutionOptimization {
73 PrimaryKey,
74 SecondaryOrderPushdown,
75 IndexRangeLimitPushdown,
76}
77
78#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct ExecutionTrace {
87 pub access_path_variant: ExecutionAccessPathVariant,
88 pub direction: OrderDirection,
89 pub optimization: Option<ExecutionOptimization>,
90 pub keys_scanned: u64,
91 pub rows_returned: u64,
92 pub continuation_applied: bool,
93}
94
95impl ExecutionTrace {
96 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
97 Self {
98 access_path_variant: access_path_variant(access),
99 direction: execution_order_direction(direction),
100 optimization: None,
101 keys_scanned: 0,
102 rows_returned: 0,
103 continuation_applied,
104 }
105 }
106
107 fn set_path_outcome(
108 &mut self,
109 optimization: Option<ExecutionOptimization>,
110 keys_scanned: usize,
111 rows_returned: usize,
112 ) {
113 self.optimization = optimization;
114 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
115 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
116 }
117}
118
119fn key_stream_comparator_from_plan<K>(
120 plan: &LogicalPlan<K>,
121 fallback_direction: Direction,
122) -> KeyOrderComparator {
123 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
124 derive_scan_direction(order, SlotSelectionPolicy::Last)
125 });
126
127 let comparator_direction = if derived_direction == fallback_direction {
130 derived_direction
131 } else {
132 fallback_direction
133 };
134
135 KeyOrderComparator::from_direction(comparator_direction)
136}
137
138struct FastPathKeyResult {
145 ordered_key_stream: OrderedKeyStreamBox,
146 rows_scanned: usize,
147 optimization: ExecutionOptimization,
148}
149
150struct IndexRangeLimitSpec {
158 fetch: usize,
159}
160
161#[derive(Clone)]
169pub(crate) struct LoadExecutor<E: EntityKind> {
170 db: Db<E::Canister>,
171 debug: bool,
172 _marker: PhantomData<E>,
173}
174
175impl<E> LoadExecutor<E>
176where
177 E: EntityKind + EntityValue,
178{
179 #[must_use]
180 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
181 Self {
182 db,
183 debug,
184 _marker: PhantomData,
185 }
186 }
187
188 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
189 self.execute_paged_with_cursor(plan, PlannedCursor::none())
190 .map(|page| page.items)
191 }
192
193 pub(in crate::db) fn execute_paged_with_cursor(
194 &self,
195 plan: ExecutablePlan<E>,
196 cursor: impl Into<PlannedCursor>,
197 ) -> Result<CursorPage<E>, InternalError> {
198 self.execute_paged_with_cursor_traced(plan, cursor)
199 .map(|(page, _)| page)
200 }
201
202 pub(in crate::db) fn execute_paged_with_cursor_traced(
203 &self,
204 plan: ExecutablePlan<E>,
205 cursor: impl Into<PlannedCursor>,
206 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
207 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
208 let cursor_boundary = cursor.boundary().cloned();
209 let index_range_anchor = cursor.index_range_anchor().cloned();
210
211 if !plan.mode().is_load() {
212 return Err(InternalError::query_executor_invariant(
213 "load executor requires load plans",
214 ));
215 }
216
217 let direction = plan.direction();
218 let continuation_signature = plan.continuation_signature();
219 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
220 let index_range_specs = plan.index_range_specs()?.to_vec();
221 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
222 let mut execution_trace = self
223 .debug
224 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
225
226 let result = (|| {
227 let mut span = Span::<E>::new(ExecKind::Load);
228 let plan = plan.into_inner();
229
230 validate_executor_plan::<E>(&plan)?;
231 let ctx = self.db.recovered_context::<E>()?;
232 let execution_inputs = ExecutionInputs {
233 ctx: &ctx,
234 plan: &plan,
235 index_prefix_specs: index_prefix_specs.as_slice(),
236 index_range_specs: index_range_specs.as_slice(),
237 index_range_anchor: index_range_anchor.as_ref(),
238 direction,
239 };
240
241 record_plan_metrics(&plan.access);
242 let fast_path_plan = Self::build_fast_path_plan(
244 &plan,
245 cursor_boundary.as_ref(),
246 index_range_anchor.as_ref(),
247 None,
248 )?;
249
250 let mut resolved =
252 Self::resolve_execution_key_stream(&execution_inputs, &fast_path_plan)?;
253 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
254 &ctx,
255 &plan,
256 resolved.key_stream.as_mut(),
257 cursor_boundary.as_ref(),
258 direction,
259 continuation_signature,
260 )?;
261 let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
262
263 Ok(Self::finalize_execution(
264 page,
265 resolved.optimization,
266 rows_scanned,
267 post_access_rows,
268 &mut span,
269 &mut execution_trace,
270 ))
271 })();
272
273 result.map(|page| (page, execution_trace))
274 }
275
276 fn finalize_path_outcome(
278 execution_trace: &mut Option<ExecutionTrace>,
279 optimization: Option<ExecutionOptimization>,
280 rows_scanned: usize,
281 rows_returned: usize,
282 ) {
283 record_rows_scanned::<E>(rows_scanned);
284 if let Some(execution_trace) = execution_trace.as_mut() {
285 execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
286 debug_assert_eq!(
287 execution_trace.keys_scanned,
288 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
289 "execution trace keys_scanned must match rows_scanned metrics input",
290 );
291 }
292 }
293
294 fn validate_pk_fast_path_boundary_if_applicable(
296 plan: &LogicalPlan<E::Key>,
297 cursor_boundary: Option<&CursorBoundary>,
298 ) -> Result<(), InternalError> {
299 if !Self::is_pk_order_stream_eligible(plan) {
300 return Ok(());
301 }
302 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
303
304 Ok(())
305 }
306}