1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16 db::{
17 Db,
18 access::AccessPlan,
19 cursor::{ContinuationToken, CursorBoundary, PlannedCursor, decode_pk_cursor_boundary},
20 direction::Direction,
21 executor::{
22 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
23 IndexPredicateCompileMode, KeyOrderComparator, OrderedKeyStreamBox,
24 aggregate::field::{
25 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
26 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
27 },
28 plan_metrics::{record_plan_metrics, record_rows_scanned},
29 range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
30 },
31 policy,
32 query::plan::{AccessPlannedQuery, OrderDirection},
33 response::Response,
34 },
35 error::InternalError,
36 obs::sink::{ExecKind, Span},
37 traits::{EntityKind, EntityValue},
38};
39use std::marker::PhantomData;
40
41#[derive(Debug)]
49pub(crate) struct CursorPage<E: EntityKind> {
50 pub(crate) items: Response<E>,
51
52 pub(crate) next_cursor: Option<ContinuationToken>,
53}
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
62pub enum ExecutionAccessPathVariant {
63 ByKey,
64 ByKeys,
65 KeyRange,
66 IndexPrefix,
67 IndexRange,
68 FullScan,
69 Union,
70 Intersection,
71}
72
73#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80pub enum ExecutionOptimization {
81 PrimaryKey,
82 SecondaryOrderPushdown,
83 IndexRangeLimitPushdown,
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub struct ExecutionTrace {
95 pub access_path_variant: ExecutionAccessPathVariant,
96 pub direction: OrderDirection,
97 pub optimization: Option<ExecutionOptimization>,
98 pub keys_scanned: u64,
99 pub rows_returned: u64,
100 pub continuation_applied: bool,
101 pub index_predicate_applied: bool,
102 pub index_predicate_keys_rejected: u64,
103 pub distinct_keys_deduped: u64,
104}
105
106impl ExecutionTrace {
107 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
108 Self {
109 access_path_variant: access_path_variant(access),
110 direction: execution_order_direction(direction),
111 optimization: None,
112 keys_scanned: 0,
113 rows_returned: 0,
114 continuation_applied,
115 index_predicate_applied: false,
116 index_predicate_keys_rejected: 0,
117 distinct_keys_deduped: 0,
118 }
119 }
120
121 fn set_path_outcome(
122 &mut self,
123 optimization: Option<ExecutionOptimization>,
124 keys_scanned: usize,
125 rows_returned: usize,
126 index_predicate_applied: bool,
127 index_predicate_keys_rejected: u64,
128 distinct_keys_deduped: u64,
129 ) {
130 self.optimization = optimization;
131 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
132 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
133 self.index_predicate_applied = index_predicate_applied;
134 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
135 self.distinct_keys_deduped = distinct_keys_deduped;
136 }
137}
138
139pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
140 direction: Direction,
141) -> KeyOrderComparator {
142 KeyOrderComparator::from_direction(direction)
143}
144
145pub(in crate::db::executor) struct FastPathKeyResult {
153 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
154 pub(in crate::db::executor) rows_scanned: usize,
155 pub(in crate::db::executor) optimization: ExecutionOptimization,
156}
157
158#[derive(Clone)]
166pub(crate) struct LoadExecutor<E: EntityKind> {
167 db: Db<E::Canister>,
168 debug: bool,
169 _marker: PhantomData<E>,
170}
171
172impl<E> LoadExecutor<E>
173where
174 E: EntityKind + EntityValue,
175{
176 #[must_use]
177 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
178 Self {
179 db,
180 debug,
181 _marker: PhantomData,
182 }
183 }
184
185 pub(in crate::db::executor) fn recovered_context(
187 &self,
188 ) -> Result<crate::db::Context<'_, E>, InternalError> {
189 self.db.recovered_context::<E>()
190 }
191
192 pub(in crate::db::executor) fn resolve_orderable_field_slot(
195 target_field: &str,
196 ) -> Result<FieldSlot, InternalError> {
197 resolve_orderable_aggregate_target_slot::<E>(target_field)
198 .map_err(AggregateFieldValueError::into_internal_error)
199 }
200
201 pub(in crate::db::executor) fn resolve_any_field_slot(
204 target_field: &str,
205 ) -> Result<FieldSlot, InternalError> {
206 resolve_any_aggregate_target_slot::<E>(target_field)
207 .map_err(AggregateFieldValueError::into_internal_error)
208 }
209
210 pub(in crate::db::executor) fn resolve_numeric_field_slot(
213 target_field: &str,
214 ) -> Result<FieldSlot, InternalError> {
215 resolve_numeric_aggregate_target_slot::<E>(target_field)
216 .map_err(AggregateFieldValueError::into_internal_error)
217 }
218
219 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
220 self.execute_paged_with_cursor(plan, PlannedCursor::none())
221 .map(|page| page.items)
222 }
223
224 pub(in crate::db) fn execute_paged_with_cursor(
225 &self,
226 plan: ExecutablePlan<E>,
227 cursor: impl Into<PlannedCursor>,
228 ) -> Result<CursorPage<E>, InternalError> {
229 self.execute_paged_with_cursor_traced(plan, cursor)
230 .map(|(page, _)| page)
231 }
232
233 pub(in crate::db) fn execute_paged_with_cursor_traced(
234 &self,
235 plan: ExecutablePlan<E>,
236 cursor: impl Into<PlannedCursor>,
237 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
238 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
239 let cursor_boundary = cursor.boundary().cloned();
240 let index_range_token = cursor
241 .index_range_anchor()
242 .map(range_token_from_cursor_anchor);
243
244 if !plan.mode().is_load() {
245 return Err(InternalError::query_executor_invariant(
246 "load executor requires load plans",
247 ));
248 }
249 debug_assert!(
250 policy::validate_plan_shape(plan.as_inner()).is_ok(),
251 "load executor received a plan shape that bypassed planning validation",
252 );
253
254 let continuation_signature = plan.continuation_signature();
255 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
256 let index_range_specs = plan.index_range_specs()?.to_vec();
257 let route_plan = Self::build_execution_route_plan_for_load(
258 plan.as_inner(),
259 cursor_boundary.as_ref(),
260 index_range_token.as_ref(),
261 None,
262 )?;
263 let continuation_applied = !matches!(
264 route_plan.continuation_mode(),
265 crate::db::executor::route::ContinuationMode::Initial
266 );
267 let direction = route_plan.direction();
268 debug_assert_eq!(
269 route_plan.window().effective_offset,
270 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
271 "route window effective offset must match logical plan offset semantics",
272 );
273 let mut execution_trace = self
274 .debug
275 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
276 let plan = plan.into_inner();
277 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
278
279 let result = (|| {
280 let mut span = Span::<E>::new(ExecKind::Load);
281
282 validate_executor_plan::<E>(&plan)?;
283 let ctx = self.db.recovered_context::<E>()?;
284 let execution_inputs = ExecutionInputs {
285 ctx: &ctx,
286 plan: &plan,
287 stream_bindings: AccessStreamBindings {
288 index_prefix_specs: index_prefix_specs.as_slice(),
289 index_range_specs: index_range_specs.as_slice(),
290 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
291 direction,
292 },
293 execution_preparation: &execution_preparation,
294 };
295
296 record_plan_metrics(&plan.access);
297 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
300 &execution_inputs,
301 &route_plan,
302 cursor_boundary.as_ref(),
303 continuation_signature,
304 IndexPredicateCompileMode::ConservativeSubset,
305 )?;
306 let page = materialized.page;
307 let rows_scanned = materialized.rows_scanned;
308 let post_access_rows = materialized.post_access_rows;
309 let optimization = materialized.optimization;
310 let index_predicate_applied = materialized.index_predicate_applied;
311 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
312 let distinct_keys_deduped = materialized.distinct_keys_deduped;
313
314 Ok(Self::finalize_execution(
315 page,
316 optimization,
317 rows_scanned,
318 post_access_rows,
319 index_predicate_applied,
320 index_predicate_keys_rejected,
321 distinct_keys_deduped,
322 &mut span,
323 &mut execution_trace,
324 ))
325 })();
326
327 result.map(|page| (page, execution_trace))
328 }
329
330 fn finalize_path_outcome(
332 execution_trace: &mut Option<ExecutionTrace>,
333 optimization: Option<ExecutionOptimization>,
334 rows_scanned: usize,
335 rows_returned: usize,
336 index_predicate_applied: bool,
337 index_predicate_keys_rejected: u64,
338 distinct_keys_deduped: u64,
339 ) {
340 record_rows_scanned::<E>(rows_scanned);
341 if let Some(execution_trace) = execution_trace.as_mut() {
342 execution_trace.set_path_outcome(
343 optimization,
344 rows_scanned,
345 rows_returned,
346 index_predicate_applied,
347 index_predicate_keys_rejected,
348 distinct_keys_deduped,
349 );
350 debug_assert_eq!(
351 execution_trace.keys_scanned,
352 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
353 "execution trace keys_scanned must match rows_scanned metrics input",
354 );
355 }
356 }
357
358 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
360 plan: &AccessPlannedQuery<E::Key>,
361 cursor_boundary: Option<&CursorBoundary>,
362 ) -> Result<(), InternalError> {
363 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
364 return Ok(());
365 }
366 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
367
368 Ok(())
369 }
370}