icydb_core/db/executor/load/
mod.rs1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13 execute::ExecutionInputs,
14 trace::{access_path_variant, execution_order_direction},
15};
16use crate::{
17 db::{
18 Db,
19 executor::{
20 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
21 plan::{record_plan_metrics, record_rows_scanned},
22 },
23 query::plan::{
24 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
25 PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
26 validate::validate_executor_plan,
27 },
28 response::Response,
29 },
30 error::InternalError,
31 obs::sink::{ExecKind, Span},
32 traits::{EntityKind, EntityValue},
33};
34use std::marker::PhantomData;
35
36#[derive(Debug)]
44pub(crate) struct CursorPage<E: EntityKind> {
45 pub(crate) items: Response<E>,
46
47 pub(crate) next_cursor: Option<Vec<u8>>,
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57 ByKey,
58 ByKeys,
59 KeyRange,
60 IndexPrefix,
61 IndexRange,
62 FullScan,
63 Union,
64 Intersection,
65}
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75 PrimaryKey,
76 SecondaryOrderPushdown,
77 IndexRangeLimitPushdown,
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89 pub access_path_variant: ExecutionAccessPathVariant,
90 pub direction: OrderDirection,
91 pub optimization: Option<ExecutionOptimization>,
92 pub keys_scanned: u64,
93 pub rows_returned: u64,
94 pub continuation_applied: bool,
95}
96
97impl ExecutionTrace {
98 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
99 Self {
100 access_path_variant: access_path_variant(access),
101 direction: execution_order_direction(direction),
102 optimization: None,
103 keys_scanned: 0,
104 rows_returned: 0,
105 continuation_applied,
106 }
107 }
108
109 fn set_path_outcome(
110 &mut self,
111 optimization: Option<ExecutionOptimization>,
112 keys_scanned: usize,
113 rows_returned: usize,
114 ) {
115 self.optimization = optimization;
116 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
117 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
118 }
119}
120
121fn key_stream_comparator_from_plan<K>(
122 plan: &LogicalPlan<K>,
123 fallback_direction: Direction,
124) -> KeyOrderComparator {
125 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
126 derive_scan_direction(order, SlotSelectionPolicy::Last)
127 });
128
129 let comparator_direction = if derived_direction == fallback_direction {
132 derived_direction
133 } else {
134 fallback_direction
135 };
136
137 KeyOrderComparator::from_direction(comparator_direction)
138}
139
140struct FastPathKeyResult {
147 ordered_key_stream: OrderedKeyStreamBox,
148 rows_scanned: usize,
149 optimization: ExecutionOptimization,
150}
151
152#[derive(Clone, Copy, Debug, Eq, PartialEq)]
160struct IndexRangeLimitSpec {
161 fetch: usize,
162}
163
164#[derive(Clone)]
172pub(crate) struct LoadExecutor<E: EntityKind> {
173 db: Db<E::Canister>,
174 debug: bool,
175 _marker: PhantomData<E>,
176}
177
178impl<E> LoadExecutor<E>
179where
180 E: EntityKind + EntityValue,
181{
182 #[must_use]
183 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
184 Self {
185 db,
186 debug,
187 _marker: PhantomData,
188 }
189 }
190
191 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
192 self.execute_paged_with_cursor(plan, PlannedCursor::none())
193 .map(|page| page.items)
194 }
195
196 pub(in crate::db) fn execute_paged_with_cursor(
197 &self,
198 plan: ExecutablePlan<E>,
199 cursor: impl Into<PlannedCursor>,
200 ) -> Result<CursorPage<E>, InternalError> {
201 self.execute_paged_with_cursor_traced(plan, cursor)
202 .map(|(page, _)| page)
203 }
204
205 pub(in crate::db) fn execute_paged_with_cursor_traced(
206 &self,
207 plan: ExecutablePlan<E>,
208 cursor: impl Into<PlannedCursor>,
209 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
210 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
211 let cursor_boundary = cursor.boundary().cloned();
212 let index_range_anchor = cursor.index_range_anchor().cloned();
213
214 if !plan.mode().is_load() {
215 return Err(InternalError::query_executor_invariant(
216 "load executor requires load plans",
217 ));
218 }
219
220 let direction = plan.direction();
221 let continuation_signature = plan.continuation_signature();
222 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
223 let index_range_specs = plan.index_range_specs()?.to_vec();
224 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
225 let mut execution_trace = self
226 .debug
227 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
228
229 let result = (|| {
230 let mut span = Span::<E>::new(ExecKind::Load);
231 let plan = plan.into_inner();
232
233 validate_executor_plan::<E>(&plan)?;
234 let ctx = self.db.recovered_context::<E>()?;
235 let execution_inputs = ExecutionInputs {
236 ctx: &ctx,
237 plan: &plan,
238 stream_bindings: AccessStreamBindings {
239 index_prefix_specs: index_prefix_specs.as_slice(),
240 index_range_specs: index_range_specs.as_slice(),
241 index_range_anchor: index_range_anchor.as_ref(),
242 direction,
243 },
244 };
245
246 record_plan_metrics(&plan.access);
247 let route_plan = Self::build_execution_route_plan_for_load(
249 &plan,
250 cursor_boundary.as_ref(),
251 index_range_anchor.as_ref(),
252 None,
253 direction,
254 )?;
255
256 let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
258 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
259 &ctx,
260 &plan,
261 resolved.key_stream.as_mut(),
262 route_plan.scan_hints.load_scan_budget_hint,
263 route_plan.streaming_access_shape_safe(),
264 cursor_boundary.as_ref(),
265 direction,
266 continuation_signature,
267 )?;
268 let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
269
270 Ok(Self::finalize_execution(
271 page,
272 resolved.optimization,
273 rows_scanned,
274 post_access_rows,
275 &mut span,
276 &mut execution_trace,
277 ))
278 })();
279
280 result.map(|page| (page, execution_trace))
281 }
282
283 fn finalize_path_outcome(
285 execution_trace: &mut Option<ExecutionTrace>,
286 optimization: Option<ExecutionOptimization>,
287 rows_scanned: usize,
288 rows_returned: usize,
289 ) {
290 record_rows_scanned::<E>(rows_scanned);
291 if let Some(execution_trace) = execution_trace.as_mut() {
292 execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
293 debug_assert_eq!(
294 execution_trace.keys_scanned,
295 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
296 "execution trace keys_scanned must match rows_scanned metrics input",
297 );
298 }
299 }
300
301 fn validate_pk_fast_path_boundary_if_applicable(
303 plan: &LogicalPlan<E::Key>,
304 cursor_boundary: Option<&CursorBoundary>,
305 ) -> Result<(), InternalError> {
306 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
307 return Ok(());
308 }
309 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
310
311 Ok(())
312 }
313}