1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::IndexPredicateCompileMode;
12
13use self::{
14 execute::ExecutionInputs,
15 trace::{access_path_variant, execution_order_direction},
16};
17use crate::{
18 db::{
19 Db,
20 executor::{
21 AccessStreamBindings, ExecutablePlan, KeyOrderComparator, OrderedKeyStreamBox,
22 PlannedCursor,
23 aggregate::field::{
24 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
25 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
26 },
27 compile_predicate_slots, compute_page_window, decode_pk_cursor_boundary,
28 plan::{record_plan_metrics, record_rows_scanned},
29 route::{ExecutionRoutePlan, RouteOrderSlotPolicy, derive_scan_direction},
30 },
31 query::policy,
32 query::{
33 contracts::cursor::{ContinuationToken, CursorBoundary},
34 plan::{
35 AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
36 validate::validate_executor_plan,
37 },
38 },
39 response::Response,
40 },
41 error::InternalError,
42 obs::sink::{ExecKind, Span},
43 traits::{EntityKind, EntityValue, Storable},
44};
45use std::{borrow::Cow, marker::PhantomData};
46
47#[derive(Debug)]
55pub(crate) struct CursorPage<E: EntityKind> {
56 pub(crate) items: Response<E>,
57
58 pub(crate) next_cursor: Option<ContinuationToken>,
59}
60
61#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum ExecutionAccessPathVariant {
69 ByKey,
70 ByKeys,
71 KeyRange,
72 IndexPrefix,
73 IndexRange,
74 FullScan,
75 Union,
76 Intersection,
77}
78
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub enum ExecutionOptimization {
87 PrimaryKey,
88 SecondaryOrderPushdown,
89 IndexRangeLimitPushdown,
90}
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub struct ExecutionTrace {
101 pub access_path_variant: ExecutionAccessPathVariant,
102 pub direction: OrderDirection,
103 pub optimization: Option<ExecutionOptimization>,
104 pub keys_scanned: u64,
105 pub rows_returned: u64,
106 pub continuation_applied: bool,
107 pub index_predicate_applied: bool,
108 pub index_predicate_keys_rejected: u64,
109 pub distinct_keys_deduped: u64,
110}
111
112impl ExecutionTrace {
113 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
114 Self {
115 access_path_variant: access_path_variant(access),
116 direction: execution_order_direction(direction),
117 optimization: None,
118 keys_scanned: 0,
119 rows_returned: 0,
120 continuation_applied,
121 index_predicate_applied: false,
122 index_predicate_keys_rejected: 0,
123 distinct_keys_deduped: 0,
124 }
125 }
126
127 fn set_path_outcome(
128 &mut self,
129 optimization: Option<ExecutionOptimization>,
130 keys_scanned: usize,
131 rows_returned: usize,
132 index_predicate_applied: bool,
133 index_predicate_keys_rejected: u64,
134 distinct_keys_deduped: u64,
135 ) {
136 self.optimization = optimization;
137 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
138 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
139 self.index_predicate_applied = index_predicate_applied;
140 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
141 self.distinct_keys_deduped = distinct_keys_deduped;
142 }
143}
144
145fn key_stream_comparator_from_plan<K>(
146 plan: &AccessPlannedQuery<K>,
147 fallback_direction: Direction,
148) -> KeyOrderComparator {
149 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
150 derive_scan_direction(order, RouteOrderSlotPolicy::Last)
151 });
152
153 let comparator_direction = if derived_direction == fallback_direction {
156 derived_direction
157 } else {
158 fallback_direction
159 };
160
161 KeyOrderComparator::from_direction(comparator_direction)
162}
163
164struct FastPathKeyResult {
172 ordered_key_stream: OrderedKeyStreamBox,
173 rows_scanned: usize,
174 optimization: ExecutionOptimization,
175}
176
177#[derive(Clone)]
185pub(crate) struct LoadExecutor<E: EntityKind> {
186 db: Db<E::Canister>,
187 debug: bool,
188 _marker: PhantomData<E>,
189}
190
191impl<E> LoadExecutor<E>
192where
193 E: EntityKind + EntityValue,
194{
195 #[must_use]
196 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
197 Self {
198 db,
199 debug,
200 _marker: PhantomData,
201 }
202 }
203
204 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
207 resolve_orderable_aggregate_target_slot::<E>(target_field)
208 .map_err(AggregateFieldValueError::into_internal_error)
209 }
210
211 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
214 resolve_any_aggregate_target_slot::<E>(target_field)
215 .map_err(AggregateFieldValueError::into_internal_error)
216 }
217
218 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
221 resolve_numeric_aggregate_target_slot::<E>(target_field)
222 .map_err(AggregateFieldValueError::into_internal_error)
223 }
224
225 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
226 self.execute_paged_with_cursor(plan, PlannedCursor::none())
227 .map(|page| page.items)
228 }
229
230 pub(in crate::db) fn execute_paged_with_cursor(
231 &self,
232 plan: ExecutablePlan<E>,
233 cursor: impl Into<PlannedCursor>,
234 ) -> Result<CursorPage<E>, InternalError> {
235 self.execute_paged_with_cursor_traced(plan, cursor)
236 .map(|(page, _)| page)
237 }
238
239 pub(in crate::db) fn execute_paged_with_cursor_traced(
240 &self,
241 plan: ExecutablePlan<E>,
242 cursor: impl Into<PlannedCursor>,
243 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
244 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
245 let cursor_boundary = cursor.boundary().cloned();
246 let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
247 <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
248 anchor.last_raw_key(),
249 ))
250 });
251
252 if !plan.mode().is_load() {
253 return Err(InternalError::query_executor_invariant(
254 "load executor requires load plans",
255 ));
256 }
257 debug_assert!(
258 policy::validate_plan_shape(plan.as_inner()).is_ok(),
259 "load executor received a plan shape that bypassed planning validation",
260 );
261
262 let continuation_signature = plan.continuation_signature();
263 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
264 let index_range_specs = plan.index_range_specs()?.to_vec();
265 let route_plan = Self::build_execution_route_plan_for_load(
266 plan.as_inner(),
267 cursor_boundary.as_ref(),
268 index_range_anchor.as_ref(),
269 None,
270 )?;
271 let continuation_applied = !matches!(
272 route_plan.continuation_mode(),
273 crate::db::executor::route::ContinuationMode::Initial
274 );
275 let direction = route_plan.direction();
276 debug_assert_eq!(
277 route_plan.window().effective_offset,
278 plan.as_inner()
279 .effective_page_offset(cursor_boundary.as_ref()),
280 "route window effective offset must match logical plan offset semantics",
281 );
282 let mut execution_trace = self
283 .debug
284 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
285 let plan = plan.into_inner();
286 let predicate_slots = compile_predicate_slots::<E>(&plan);
287
288 let result = (|| {
289 let mut span = Span::<E>::new(ExecKind::Load);
290
291 validate_executor_plan::<E>(&plan)?;
292 let ctx = self.db.recovered_context::<E>()?;
293 let execution_inputs = ExecutionInputs {
294 ctx: &ctx,
295 plan: &plan,
296 stream_bindings: AccessStreamBindings {
297 index_prefix_specs: index_prefix_specs.as_slice(),
298 index_range_specs: index_range_specs.as_slice(),
299 index_range_anchor: index_range_anchor.as_ref(),
300 direction,
301 },
302 predicate_slots: predicate_slots.as_ref(),
303 };
304
305 record_plan_metrics(&plan.access);
306 let materialized = Self::materialize_with_optional_residual_retry(
309 &execution_inputs,
310 &route_plan,
311 cursor_boundary.as_ref(),
312 continuation_signature,
313 IndexPredicateCompileMode::ConservativeSubset,
314 )?;
315 let page = materialized.page;
316 let rows_scanned = materialized.rows_scanned;
317 let post_access_rows = materialized.post_access_rows;
318 let optimization = materialized.optimization;
319 let index_predicate_applied = materialized.index_predicate_applied;
320 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
321 let distinct_keys_deduped = materialized.distinct_keys_deduped;
322
323 Ok(Self::finalize_execution(
324 page,
325 optimization,
326 rows_scanned,
327 post_access_rows,
328 index_predicate_applied,
329 index_predicate_keys_rejected,
330 distinct_keys_deduped,
331 &mut span,
332 &mut execution_trace,
333 ))
334 })();
335
336 result.map(|page| (page, execution_trace))
337 }
338
339 fn index_range_limited_residual_retry_required(
342 plan: &AccessPlannedQuery<E::Key>,
343 cursor_boundary: Option<&CursorBoundary>,
344 route_plan: &ExecutionRoutePlan,
345 rows_scanned: usize,
346 post_access_rows: usize,
347 ) -> bool {
348 let Some(limit_spec) = route_plan.index_range_limit_spec else {
349 return false;
350 };
351 if plan.predicate.is_none() {
352 return false;
353 }
354 if limit_spec.fetch == 0 {
355 return false;
356 }
357 let Some(limit) = plan.page.as_ref().and_then(|page| page.limit) else {
358 return false;
359 };
360 let keep_count =
361 compute_page_window(plan.effective_page_offset(cursor_boundary), limit, false)
362 .keep_count;
363 if keep_count == 0 {
364 return false;
365 }
366 if rows_scanned < limit_spec.fetch {
367 return false;
368 }
369
370 post_access_rows < keep_count
371 }
372
373 fn finalize_path_outcome(
375 execution_trace: &mut Option<ExecutionTrace>,
376 optimization: Option<ExecutionOptimization>,
377 rows_scanned: usize,
378 rows_returned: usize,
379 index_predicate_applied: bool,
380 index_predicate_keys_rejected: u64,
381 distinct_keys_deduped: u64,
382 ) {
383 record_rows_scanned::<E>(rows_scanned);
384 if let Some(execution_trace) = execution_trace.as_mut() {
385 execution_trace.set_path_outcome(
386 optimization,
387 rows_scanned,
388 rows_returned,
389 index_predicate_applied,
390 index_predicate_keys_rejected,
391 distinct_keys_deduped,
392 );
393 debug_assert_eq!(
394 execution_trace.keys_scanned,
395 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
396 "execution trace keys_scanned must match rows_scanned metrics input",
397 );
398 }
399 }
400
401 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
403 plan: &AccessPlannedQuery<E::Key>,
404 cursor_boundary: Option<&CursorBoundary>,
405 ) -> Result<(), InternalError> {
406 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
407 return Ok(());
408 }
409 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
410
411 Ok(())
412 }
413}