1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::aggregate::{
12 AggregateExecutionDescriptor, AggregateFastPathInputs,
13};
14pub(in crate::db::executor) use self::execute::{
15 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
16};
17
18use self::trace::{access_path_variant, execution_order_direction};
19use crate::{
20 db::{
21 Db,
22 executor::{
23 AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
24 KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
25 aggregate::field::{
26 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28 },
29 compile_predicate_slots, decode_pk_cursor_boundary,
30 plan::{record_plan_metrics, record_rows_scanned},
31 },
32 query::policy,
33 query::{
34 contracts::cursor::{ContinuationToken, CursorBoundary},
35 plan::{
36 AccessPlan, AccessPlannedQuery, Direction, OrderDirection,
37 validate::validate_executor_plan,
38 },
39 },
40 response::Response,
41 },
42 error::InternalError,
43 obs::sink::{ExecKind, Span},
44 traits::{EntityKind, EntityValue, Storable},
45};
46use std::{borrow::Cow, marker::PhantomData};
47
48#[derive(Debug)]
56pub(crate) struct CursorPage<E: EntityKind> {
57 pub(crate) items: Response<E>,
58
59 pub(crate) next_cursor: Option<ContinuationToken>,
60}
61
62#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum ExecutionAccessPathVariant {
70 ByKey,
71 ByKeys,
72 KeyRange,
73 IndexPrefix,
74 IndexRange,
75 FullScan,
76 Union,
77 Intersection,
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum ExecutionOptimization {
88 PrimaryKey,
89 SecondaryOrderPushdown,
90 IndexRangeLimitPushdown,
91}
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
101pub struct ExecutionTrace {
102 pub access_path_variant: ExecutionAccessPathVariant,
103 pub direction: OrderDirection,
104 pub optimization: Option<ExecutionOptimization>,
105 pub keys_scanned: u64,
106 pub rows_returned: u64,
107 pub continuation_applied: bool,
108 pub index_predicate_applied: bool,
109 pub index_predicate_keys_rejected: u64,
110 pub distinct_keys_deduped: u64,
111}
112
113impl ExecutionTrace {
114 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
115 Self {
116 access_path_variant: access_path_variant(access),
117 direction: execution_order_direction(direction),
118 optimization: None,
119 keys_scanned: 0,
120 rows_returned: 0,
121 continuation_applied,
122 index_predicate_applied: false,
123 index_predicate_keys_rejected: 0,
124 distinct_keys_deduped: 0,
125 }
126 }
127
128 fn set_path_outcome(
129 &mut self,
130 optimization: Option<ExecutionOptimization>,
131 keys_scanned: usize,
132 rows_returned: usize,
133 index_predicate_applied: bool,
134 index_predicate_keys_rejected: u64,
135 distinct_keys_deduped: u64,
136 ) {
137 self.optimization = optimization;
138 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
139 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
140 self.index_predicate_applied = index_predicate_applied;
141 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
142 self.distinct_keys_deduped = distinct_keys_deduped;
143 }
144}
145
146pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
147 direction: Direction,
148) -> KeyOrderComparator {
149 KeyOrderComparator::from_direction(direction)
150}
151
152pub(in crate::db::executor) struct FastPathKeyResult {
160 ordered_key_stream: OrderedKeyStreamBox,
161 rows_scanned: usize,
162 optimization: ExecutionOptimization,
163}
164
165#[derive(Clone)]
173pub(crate) struct LoadExecutor<E: EntityKind> {
174 db: Db<E::Canister>,
175 debug: bool,
176 _marker: PhantomData<E>,
177}
178
179impl<E> LoadExecutor<E>
180where
181 E: EntityKind + EntityValue,
182{
183 #[must_use]
184 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
185 Self {
186 db,
187 debug,
188 _marker: PhantomData,
189 }
190 }
191
192 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
195 resolve_orderable_aggregate_target_slot::<E>(target_field)
196 .map_err(AggregateFieldValueError::into_internal_error)
197 }
198
199 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
202 resolve_any_aggregate_target_slot::<E>(target_field)
203 .map_err(AggregateFieldValueError::into_internal_error)
204 }
205
206 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
209 resolve_numeric_aggregate_target_slot::<E>(target_field)
210 .map_err(AggregateFieldValueError::into_internal_error)
211 }
212
213 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
214 self.execute_paged_with_cursor(plan, PlannedCursor::none())
215 .map(|page| page.items)
216 }
217
218 pub(in crate::db) fn execute_paged_with_cursor(
219 &self,
220 plan: ExecutablePlan<E>,
221 cursor: impl Into<PlannedCursor>,
222 ) -> Result<CursorPage<E>, InternalError> {
223 self.execute_paged_with_cursor_traced(plan, cursor)
224 .map(|(page, _)| page)
225 }
226
227 pub(in crate::db) fn execute_paged_with_cursor_traced(
228 &self,
229 plan: ExecutablePlan<E>,
230 cursor: impl Into<PlannedCursor>,
231 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
232 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
233 let cursor_boundary = cursor.boundary().cloned();
234 let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
235 <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
236 anchor.last_raw_key(),
237 ))
238 });
239
240 if !plan.mode().is_load() {
241 return Err(InternalError::query_executor_invariant(
242 "load executor requires load plans",
243 ));
244 }
245 debug_assert!(
246 policy::validate_plan_shape(plan.as_inner()).is_ok(),
247 "load executor received a plan shape that bypassed planning validation",
248 );
249
250 let continuation_signature = plan.continuation_signature();
251 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
252 let index_range_specs = plan.index_range_specs()?.to_vec();
253 let route_plan = Self::build_execution_route_plan_for_load(
254 plan.as_inner(),
255 cursor_boundary.as_ref(),
256 index_range_anchor.as_ref(),
257 None,
258 )?;
259 let continuation_applied = !matches!(
260 route_plan.continuation_mode(),
261 crate::db::executor::route::ContinuationMode::Initial
262 );
263 let direction = route_plan.direction();
264 debug_assert_eq!(
265 route_plan.window().effective_offset,
266 plan.as_inner()
267 .effective_page_offset(cursor_boundary.as_ref()),
268 "route window effective offset must match logical plan offset semantics",
269 );
270 let mut execution_trace = self
271 .debug
272 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
273 let plan = plan.into_inner();
274 let predicate_slots = compile_predicate_slots::<E>(&plan);
275
276 let result = (|| {
277 let mut span = Span::<E>::new(ExecKind::Load);
278
279 validate_executor_plan::<E>(&plan)?;
280 let ctx = self.db.recovered_context::<E>()?;
281 let execution_inputs = ExecutionInputs {
282 ctx: &ctx,
283 plan: &plan,
284 stream_bindings: AccessStreamBindings {
285 index_prefix_specs: index_prefix_specs.as_slice(),
286 index_range_specs: index_range_specs.as_slice(),
287 index_range_anchor: index_range_anchor.as_ref(),
288 direction,
289 },
290 predicate_slots: predicate_slots.as_ref(),
291 };
292
293 record_plan_metrics(&plan.access);
294 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
297 &execution_inputs,
298 &route_plan,
299 cursor_boundary.as_ref(),
300 continuation_signature,
301 IndexPredicateCompileMode::ConservativeSubset,
302 )?;
303 let page = materialized.page;
304 let rows_scanned = materialized.rows_scanned;
305 let post_access_rows = materialized.post_access_rows;
306 let optimization = materialized.optimization;
307 let index_predicate_applied = materialized.index_predicate_applied;
308 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
309 let distinct_keys_deduped = materialized.distinct_keys_deduped;
310
311 Ok(Self::finalize_execution(
312 page,
313 optimization,
314 rows_scanned,
315 post_access_rows,
316 index_predicate_applied,
317 index_predicate_keys_rejected,
318 distinct_keys_deduped,
319 &mut span,
320 &mut execution_trace,
321 ))
322 })();
323
324 result.map(|page| (page, execution_trace))
325 }
326
327 fn finalize_path_outcome(
329 execution_trace: &mut Option<ExecutionTrace>,
330 optimization: Option<ExecutionOptimization>,
331 rows_scanned: usize,
332 rows_returned: usize,
333 index_predicate_applied: bool,
334 index_predicate_keys_rejected: u64,
335 distinct_keys_deduped: u64,
336 ) {
337 record_rows_scanned::<E>(rows_scanned);
338 if let Some(execution_trace) = execution_trace.as_mut() {
339 execution_trace.set_path_outcome(
340 optimization,
341 rows_scanned,
342 rows_returned,
343 index_predicate_applied,
344 index_predicate_keys_rejected,
345 distinct_keys_deduped,
346 );
347 debug_assert_eq!(
348 execution_trace.keys_scanned,
349 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
350 "execution trace keys_scanned must match rows_scanned metrics input",
351 );
352 }
353 }
354
355 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
357 plan: &AccessPlannedQuery<E::Key>,
358 cursor_boundary: Option<&CursorBoundary>,
359 ) -> Result<(), InternalError> {
360 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
361 return Ok(());
362 }
363 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
364
365 Ok(())
366 }
367}