1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod trace;
9
10use self::{
11 execute::{ExecutionInputs, IndexPredicateCompileMode},
12 trace::{access_path_variant, execution_order_direction},
13};
14use crate::{
15 db::{
16 Db,
17 executor::{
18 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
19 plan::{record_plan_metrics, record_rows_scanned},
20 route::ExecutionRoutePlan,
21 },
22 query::plan::{
23 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
24 PlannedCursor, SlotSelectionPolicy, compute_page_window, decode_pk_cursor_boundary,
25 derive_scan_direction, validate::validate_executor_plan,
26 },
27 response::Response,
28 },
29 error::InternalError,
30 obs::sink::{ExecKind, Span},
31 traits::{EntityKind, EntityValue},
32};
33use std::marker::PhantomData;
34
35#[derive(Debug)]
43pub(crate) struct CursorPage<E: EntityKind> {
44 pub(crate) items: Response<E>,
45
46 pub(crate) next_cursor: Option<Vec<u8>>,
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57 ByKey,
58 ByKeys,
59 KeyRange,
60 IndexPrefix,
61 IndexRange,
62 FullScan,
63 Union,
64 Intersection,
65}
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75 PrimaryKey,
76 SecondaryOrderPushdown,
77 IndexRangeLimitPushdown,
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89 pub access_path_variant: ExecutionAccessPathVariant,
90 pub direction: OrderDirection,
91 pub optimization: Option<ExecutionOptimization>,
92 pub keys_scanned: u64,
93 pub rows_returned: u64,
94 pub continuation_applied: bool,
95 pub index_predicate_applied: bool,
96 pub index_predicate_keys_rejected: u64,
97 pub distinct_keys_deduped: u64,
98}
99
100impl ExecutionTrace {
101 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
102 Self {
103 access_path_variant: access_path_variant(access),
104 direction: execution_order_direction(direction),
105 optimization: None,
106 keys_scanned: 0,
107 rows_returned: 0,
108 continuation_applied,
109 index_predicate_applied: false,
110 index_predicate_keys_rejected: 0,
111 distinct_keys_deduped: 0,
112 }
113 }
114
115 fn set_path_outcome(
116 &mut self,
117 optimization: Option<ExecutionOptimization>,
118 keys_scanned: usize,
119 rows_returned: usize,
120 index_predicate_applied: bool,
121 index_predicate_keys_rejected: u64,
122 distinct_keys_deduped: u64,
123 ) {
124 self.optimization = optimization;
125 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
126 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
127 self.index_predicate_applied = index_predicate_applied;
128 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
129 self.distinct_keys_deduped = distinct_keys_deduped;
130 }
131}
132
133fn key_stream_comparator_from_plan<K>(
134 plan: &LogicalPlan<K>,
135 fallback_direction: Direction,
136) -> KeyOrderComparator {
137 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
138 derive_scan_direction(order, SlotSelectionPolicy::Last)
139 });
140
141 let comparator_direction = if derived_direction == fallback_direction {
144 derived_direction
145 } else {
146 fallback_direction
147 };
148
149 KeyOrderComparator::from_direction(comparator_direction)
150}
151
152struct FastPathKeyResult {
160 ordered_key_stream: OrderedKeyStreamBox,
161 rows_scanned: usize,
162 optimization: ExecutionOptimization,
163}
164
165#[derive(Clone)]
173pub(crate) struct LoadExecutor<E: EntityKind> {
174 db: Db<E::Canister>,
175 debug: bool,
176 _marker: PhantomData<E>,
177}
178
179impl<E> LoadExecutor<E>
180where
181 E: EntityKind + EntityValue,
182{
183 #[must_use]
184 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
185 Self {
186 db,
187 debug,
188 _marker: PhantomData,
189 }
190 }
191
192 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
193 self.execute_paged_with_cursor(plan, PlannedCursor::none())
194 .map(|page| page.items)
195 }
196
197 pub(in crate::db) fn execute_paged_with_cursor(
198 &self,
199 plan: ExecutablePlan<E>,
200 cursor: impl Into<PlannedCursor>,
201 ) -> Result<CursorPage<E>, InternalError> {
202 self.execute_paged_with_cursor_traced(plan, cursor)
203 .map(|(page, _)| page)
204 }
205
206 #[expect(clippy::too_many_lines)]
207 pub(in crate::db) fn execute_paged_with_cursor_traced(
208 &self,
209 plan: ExecutablePlan<E>,
210 cursor: impl Into<PlannedCursor>,
211 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
212 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
213 let cursor_boundary = cursor.boundary().cloned();
214 let index_range_anchor = cursor.index_range_anchor().cloned();
215
216 if !plan.mode().is_load() {
217 return Err(InternalError::query_executor_invariant(
218 "load executor requires load plans",
219 ));
220 }
221
222 let direction = plan.direction();
223 let continuation_signature = plan.continuation_signature();
224 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
225 let index_range_specs = plan.index_range_specs()?.to_vec();
226 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
227 let mut execution_trace = self
228 .debug
229 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
230 let (plan, predicate_slots) = plan.into_parts();
231
232 let result = (|| {
233 let mut span = Span::<E>::new(ExecKind::Load);
234
235 validate_executor_plan::<E>(&plan)?;
236 let ctx = self.db.recovered_context::<E>()?;
237 let execution_inputs = ExecutionInputs {
238 ctx: &ctx,
239 plan: &plan,
240 stream_bindings: AccessStreamBindings {
241 index_prefix_specs: index_prefix_specs.as_slice(),
242 index_range_specs: index_range_specs.as_slice(),
243 index_range_anchor: index_range_anchor.as_ref(),
244 direction,
245 },
246 predicate_slots: predicate_slots.as_ref(),
247 };
248
249 record_plan_metrics(&plan.access);
250 let route_plan = Self::build_execution_route_plan_for_load(
252 &plan,
253 cursor_boundary.as_ref(),
254 index_range_anchor.as_ref(),
255 None,
256 direction,
257 )?;
258
259 let mut resolved = Self::resolve_execution_key_stream(
261 &execution_inputs,
262 &route_plan,
263 IndexPredicateCompileMode::ConservativeSubset,
264 )?;
265 let (mut page, keys_scanned, mut post_access_rows) =
266 Self::materialize_key_stream_into_page(
267 &ctx,
268 &plan,
269 predicate_slots.as_ref(),
270 resolved.key_stream.as_mut(),
271 route_plan.scan_hints.load_scan_budget_hint,
272 route_plan.streaming_access_shape_safe(),
273 cursor_boundary.as_ref(),
274 direction,
275 continuation_signature,
276 )?;
277 let mut rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
278 let mut optimization = resolved.optimization;
279 let mut index_predicate_applied = resolved.index_predicate_applied;
280 let mut index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
281 let mut distinct_keys_deduped = resolved
282 .distinct_keys_deduped_counter
283 .as_ref()
284 .map_or(0, |counter| counter.get());
285
286 if Self::index_range_limited_residual_retry_required(
287 &plan,
288 cursor_boundary.as_ref(),
289 &route_plan,
290 rows_scanned,
291 post_access_rows,
292 ) {
293 let mut fallback_route_plan = route_plan;
294 fallback_route_plan.index_range_limit_spec = None;
295 let mut fallback_resolved = Self::resolve_execution_key_stream(
296 &execution_inputs,
297 &fallback_route_plan,
298 IndexPredicateCompileMode::ConservativeSubset,
299 )?;
300 let (fallback_page, fallback_keys_scanned, fallback_post_access_rows) =
301 Self::materialize_key_stream_into_page(
302 &ctx,
303 &plan,
304 predicate_slots.as_ref(),
305 fallback_resolved.key_stream.as_mut(),
306 fallback_route_plan.scan_hints.load_scan_budget_hint,
307 fallback_route_plan.streaming_access_shape_safe(),
308 cursor_boundary.as_ref(),
309 direction,
310 continuation_signature,
311 )?;
312 let fallback_rows_scanned = fallback_resolved
313 .rows_scanned_override
314 .unwrap_or(fallback_keys_scanned);
315 let fallback_distinct_keys_deduped = fallback_resolved
316 .distinct_keys_deduped_counter
317 .as_ref()
318 .map_or(0, |counter| counter.get());
319
320 rows_scanned = rows_scanned.saturating_add(fallback_rows_scanned);
322 optimization = fallback_resolved.optimization;
323 index_predicate_applied =
324 index_predicate_applied || fallback_resolved.index_predicate_applied;
325 index_predicate_keys_rejected = index_predicate_keys_rejected
326 .saturating_add(fallback_resolved.index_predicate_keys_rejected);
327 distinct_keys_deduped =
328 distinct_keys_deduped.saturating_add(fallback_distinct_keys_deduped);
329 page = fallback_page;
330 post_access_rows = fallback_post_access_rows;
331 }
332
333 Ok(Self::finalize_execution(
334 page,
335 optimization,
336 rows_scanned,
337 post_access_rows,
338 index_predicate_applied,
339 index_predicate_keys_rejected,
340 distinct_keys_deduped,
341 &mut span,
342 &mut execution_trace,
343 ))
344 })();
345
346 result.map(|page| (page, execution_trace))
347 }
348
349 fn index_range_limited_residual_retry_required(
352 plan: &LogicalPlan<E::Key>,
353 cursor_boundary: Option<&CursorBoundary>,
354 route_plan: &ExecutionRoutePlan,
355 rows_scanned: usize,
356 post_access_rows: usize,
357 ) -> bool {
358 let Some(limit_spec) = route_plan.index_range_limit_spec else {
359 return false;
360 };
361 if plan.predicate.is_none() {
362 return false;
363 }
364 if limit_spec.fetch == 0 {
365 return false;
366 }
367 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
368 return false;
369 };
370 let keep_count =
371 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
372 .keep_count;
373 if keep_count == 0 {
374 return false;
375 }
376 if rows_scanned < limit_spec.fetch {
377 return false;
378 }
379
380 post_access_rows < keep_count
381 }
382
383 fn finalize_path_outcome(
385 execution_trace: &mut Option<ExecutionTrace>,
386 optimization: Option<ExecutionOptimization>,
387 rows_scanned: usize,
388 rows_returned: usize,
389 index_predicate_applied: bool,
390 index_predicate_keys_rejected: u64,
391 distinct_keys_deduped: u64,
392 ) {
393 record_rows_scanned::<E>(rows_scanned);
394 if let Some(execution_trace) = execution_trace.as_mut() {
395 execution_trace.set_path_outcome(
396 optimization,
397 rows_scanned,
398 rows_returned,
399 index_predicate_applied,
400 index_predicate_keys_rejected,
401 distinct_keys_deduped,
402 );
403 debug_assert_eq!(
404 execution_trace.keys_scanned,
405 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
406 "execution trace keys_scanned must match rows_scanned metrics input",
407 );
408 }
409 }
410
411 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
413 plan: &LogicalPlan<E::Key>,
414 cursor_boundary: Option<&CursorBoundary>,
415 ) -> Result<(), InternalError> {
416 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
417 return Ok(());
418 }
419 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
420
421 Ok(())
422 }
423}