1mod aggregate;
2mod execute;
3mod fast_stream;
4mod index_range_limit;
5mod page;
6mod pk_stream;
7mod secondary_index;
8mod terminal;
9mod trace;
10
11pub(in crate::db::executor) use self::execute::{
12 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
13};
14
15use self::trace::{access_path_variant, execution_order_direction};
16use crate::{
17 db::{
18 Db,
19 access::AccessPlan,
20 cursor::{ContinuationToken, CursorBoundary},
21 direction::Direction,
22 executor::{
23 AccessStreamBindings, ExecutablePlan, ExecutionKernel, IndexPredicateCompileMode,
24 KeyOrderComparator, OrderedKeyStreamBox, PlannedCursor,
25 aggregate::field::{
26 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
27 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
28 },
29 compile_predicate_slots, decode_pk_cursor_boundary,
30 plan_metrics::{record_plan_metrics, record_rows_scanned},
31 },
32 plan::{AccessPlannedQuery, OrderDirection},
33 query::plan::validate::validate_executor_plan,
34 query::policy,
35 response::Response,
36 },
37 error::InternalError,
38 obs::sink::{ExecKind, Span},
39 traits::{EntityKind, EntityValue, Storable},
40};
41use std::{borrow::Cow, marker::PhantomData};
42
43#[derive(Debug)]
51pub(crate) struct CursorPage<E: EntityKind> {
52 pub(crate) items: Response<E>,
53
54 pub(crate) next_cursor: Option<ContinuationToken>,
55}
56
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
64pub enum ExecutionAccessPathVariant {
65 ByKey,
66 ByKeys,
67 KeyRange,
68 IndexPrefix,
69 IndexRange,
70 FullScan,
71 Union,
72 Intersection,
73}
74
75#[derive(Clone, Copy, Debug, Eq, PartialEq)]
82pub enum ExecutionOptimization {
83 PrimaryKey,
84 SecondaryOrderPushdown,
85 IndexRangeLimitPushdown,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
96pub struct ExecutionTrace {
97 pub access_path_variant: ExecutionAccessPathVariant,
98 pub direction: OrderDirection,
99 pub optimization: Option<ExecutionOptimization>,
100 pub keys_scanned: u64,
101 pub rows_returned: u64,
102 pub continuation_applied: bool,
103 pub index_predicate_applied: bool,
104 pub index_predicate_keys_rejected: u64,
105 pub distinct_keys_deduped: u64,
106}
107
108impl ExecutionTrace {
109 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
110 Self {
111 access_path_variant: access_path_variant(access),
112 direction: execution_order_direction(direction),
113 optimization: None,
114 keys_scanned: 0,
115 rows_returned: 0,
116 continuation_applied,
117 index_predicate_applied: false,
118 index_predicate_keys_rejected: 0,
119 distinct_keys_deduped: 0,
120 }
121 }
122
123 fn set_path_outcome(
124 &mut self,
125 optimization: Option<ExecutionOptimization>,
126 keys_scanned: usize,
127 rows_returned: usize,
128 index_predicate_applied: bool,
129 index_predicate_keys_rejected: u64,
130 distinct_keys_deduped: u64,
131 ) {
132 self.optimization = optimization;
133 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
134 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
135 self.index_predicate_applied = index_predicate_applied;
136 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
137 self.distinct_keys_deduped = distinct_keys_deduped;
138 }
139}
140
141pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
142 direction: Direction,
143) -> KeyOrderComparator {
144 KeyOrderComparator::from_direction(direction)
145}
146
147pub(in crate::db::executor) struct FastPathKeyResult {
155 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
156 pub(in crate::db::executor) rows_scanned: usize,
157 pub(in crate::db::executor) optimization: ExecutionOptimization,
158}
159
160#[derive(Clone)]
168pub(crate) struct LoadExecutor<E: EntityKind> {
169 db: Db<E::Canister>,
170 debug: bool,
171 _marker: PhantomData<E>,
172}
173
174impl<E> LoadExecutor<E>
175where
176 E: EntityKind + EntityValue,
177{
178 #[must_use]
179 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
180 Self {
181 db,
182 debug,
183 _marker: PhantomData,
184 }
185 }
186
187 pub(in crate::db::executor) fn recovered_context(
189 &self,
190 ) -> Result<crate::db::Context<'_, E>, InternalError> {
191 self.db.recovered_context::<E>()
192 }
193
194 fn resolve_orderable_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
197 resolve_orderable_aggregate_target_slot::<E>(target_field)
198 .map_err(AggregateFieldValueError::into_internal_error)
199 }
200
201 fn resolve_any_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
204 resolve_any_aggregate_target_slot::<E>(target_field)
205 .map_err(AggregateFieldValueError::into_internal_error)
206 }
207
208 fn resolve_numeric_field_slot(target_field: &str) -> Result<FieldSlot, InternalError> {
211 resolve_numeric_aggregate_target_slot::<E>(target_field)
212 .map_err(AggregateFieldValueError::into_internal_error)
213 }
214
215 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
216 self.execute_paged_with_cursor(plan, PlannedCursor::none())
217 .map(|page| page.items)
218 }
219
220 pub(in crate::db) fn execute_paged_with_cursor(
221 &self,
222 plan: ExecutablePlan<E>,
223 cursor: impl Into<PlannedCursor>,
224 ) -> Result<CursorPage<E>, InternalError> {
225 self.execute_paged_with_cursor_traced(plan, cursor)
226 .map(|(page, _)| page)
227 }
228
229 pub(in crate::db) fn execute_paged_with_cursor_traced(
230 &self,
231 plan: ExecutablePlan<E>,
232 cursor: impl Into<PlannedCursor>,
233 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
234 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
235 let cursor_boundary = cursor.boundary().cloned();
236 let index_range_anchor = cursor.index_range_anchor().map(|anchor| {
237 <crate::db::lowering::LoweredKey as Storable>::from_bytes(Cow::Borrowed(
238 anchor.last_raw_key(),
239 ))
240 });
241
242 if !plan.mode().is_load() {
243 return Err(InternalError::query_executor_invariant(
244 "load executor requires load plans",
245 ));
246 }
247 debug_assert!(
248 policy::validate_plan_shape(plan.as_inner()).is_ok(),
249 "load executor received a plan shape that bypassed planning validation",
250 );
251
252 let continuation_signature = plan.continuation_signature();
253 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
254 let index_range_specs = plan.index_range_specs()?.to_vec();
255 let route_plan = Self::build_execution_route_plan_for_load(
256 plan.as_inner(),
257 cursor_boundary.as_ref(),
258 index_range_anchor.as_ref(),
259 None,
260 )?;
261 let continuation_applied = !matches!(
262 route_plan.continuation_mode(),
263 crate::db::executor::route::ContinuationMode::Initial
264 );
265 let direction = route_plan.direction();
266 debug_assert_eq!(
267 route_plan.window().effective_offset,
268 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
269 "route window effective offset must match logical plan offset semantics",
270 );
271 let mut execution_trace = self
272 .debug
273 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
274 let plan = plan.into_inner();
275 let predicate_slots = compile_predicate_slots::<E>(&plan);
276
277 let result = (|| {
278 let mut span = Span::<E>::new(ExecKind::Load);
279
280 validate_executor_plan::<E>(&plan)?;
281 let ctx = self.db.recovered_context::<E>()?;
282 let execution_inputs = ExecutionInputs {
283 ctx: &ctx,
284 plan: &plan,
285 stream_bindings: AccessStreamBindings {
286 index_prefix_specs: index_prefix_specs.as_slice(),
287 index_range_specs: index_range_specs.as_slice(),
288 index_range_anchor: index_range_anchor.as_ref(),
289 direction,
290 },
291 predicate_slots: predicate_slots.as_ref(),
292 };
293
294 record_plan_metrics(&plan.access);
295 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
298 &execution_inputs,
299 &route_plan,
300 cursor_boundary.as_ref(),
301 continuation_signature,
302 IndexPredicateCompileMode::ConservativeSubset,
303 )?;
304 let page = materialized.page;
305 let rows_scanned = materialized.rows_scanned;
306 let post_access_rows = materialized.post_access_rows;
307 let optimization = materialized.optimization;
308 let index_predicate_applied = materialized.index_predicate_applied;
309 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
310 let distinct_keys_deduped = materialized.distinct_keys_deduped;
311
312 Ok(Self::finalize_execution(
313 page,
314 optimization,
315 rows_scanned,
316 post_access_rows,
317 index_predicate_applied,
318 index_predicate_keys_rejected,
319 distinct_keys_deduped,
320 &mut span,
321 &mut execution_trace,
322 ))
323 })();
324
325 result.map(|page| (page, execution_trace))
326 }
327
328 fn finalize_path_outcome(
330 execution_trace: &mut Option<ExecutionTrace>,
331 optimization: Option<ExecutionOptimization>,
332 rows_scanned: usize,
333 rows_returned: usize,
334 index_predicate_applied: bool,
335 index_predicate_keys_rejected: u64,
336 distinct_keys_deduped: u64,
337 ) {
338 record_rows_scanned::<E>(rows_scanned);
339 if let Some(execution_trace) = execution_trace.as_mut() {
340 execution_trace.set_path_outcome(
341 optimization,
342 rows_scanned,
343 rows_returned,
344 index_predicate_applied,
345 index_predicate_keys_rejected,
346 distinct_keys_deduped,
347 );
348 debug_assert_eq!(
349 execution_trace.keys_scanned,
350 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
351 "execution trace keys_scanned must match rows_scanned metrics input",
352 );
353 }
354 }
355
356 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
358 plan: &AccessPlannedQuery<E::Key>,
359 cursor_boundary: Option<&CursorBoundary>,
360 ) -> Result<(), InternalError> {
361 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
362 return Ok(());
363 }
364 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
365
366 Ok(())
367 }
368}