1mod aggregate;
2mod aggregate_field;
3mod aggregate_guard;
4mod execute;
5mod index_range_limit;
6mod page;
7mod pk_stream;
8mod route;
9mod secondary_index;
10mod trace;
11
12use self::{
13 execute::ExecutionInputs,
14 trace::{access_path_variant, execution_order_direction},
15};
16use crate::{
17 db::{
18 Db,
19 executor::{
20 AccessStreamBindings, KeyOrderComparator, OrderedKeyStreamBox,
21 plan::{record_plan_metrics, record_rows_scanned},
22 },
23 query::plan::{
24 AccessPlan, CursorBoundary, Direction, ExecutablePlan, LogicalPlan, OrderDirection,
25 PlannedCursor, SlotSelectionPolicy, decode_pk_cursor_boundary, derive_scan_direction,
26 validate::validate_executor_plan,
27 },
28 response::Response,
29 },
30 error::InternalError,
31 obs::sink::{ExecKind, Span},
32 traits::{EntityKind, EntityValue},
33};
34use std::marker::PhantomData;
35
36#[derive(Debug)]
44pub(crate) struct CursorPage<E: EntityKind> {
45 pub(crate) items: Response<E>,
46
47 pub(crate) next_cursor: Option<Vec<u8>>,
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum ExecutionAccessPathVariant {
57 ByKey,
58 ByKeys,
59 KeyRange,
60 IndexPrefix,
61 IndexRange,
62 FullScan,
63 Union,
64 Intersection,
65}
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74pub enum ExecutionOptimization {
75 PrimaryKey,
76 SecondaryOrderPushdown,
77 IndexRangeLimitPushdown,
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
88pub struct ExecutionTrace {
89 pub access_path_variant: ExecutionAccessPathVariant,
90 pub direction: OrderDirection,
91 pub optimization: Option<ExecutionOptimization>,
92 pub keys_scanned: u64,
93 pub rows_returned: u64,
94 pub continuation_applied: bool,
95 pub index_predicate_applied: bool,
96 pub index_predicate_keys_rejected: u64,
97 pub distinct_keys_deduped: u64,
98}
99
100impl ExecutionTrace {
101 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
102 Self {
103 access_path_variant: access_path_variant(access),
104 direction: execution_order_direction(direction),
105 optimization: None,
106 keys_scanned: 0,
107 rows_returned: 0,
108 continuation_applied,
109 index_predicate_applied: false,
110 index_predicate_keys_rejected: 0,
111 distinct_keys_deduped: 0,
112 }
113 }
114
115 fn set_path_outcome(
116 &mut self,
117 optimization: Option<ExecutionOptimization>,
118 keys_scanned: usize,
119 rows_returned: usize,
120 index_predicate_applied: bool,
121 index_predicate_keys_rejected: u64,
122 distinct_keys_deduped: u64,
123 ) {
124 self.optimization = optimization;
125 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
126 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
127 self.index_predicate_applied = index_predicate_applied;
128 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
129 self.distinct_keys_deduped = distinct_keys_deduped;
130 }
131}
132
133fn key_stream_comparator_from_plan<K>(
134 plan: &LogicalPlan<K>,
135 fallback_direction: Direction,
136) -> KeyOrderComparator {
137 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
138 derive_scan_direction(order, SlotSelectionPolicy::Last)
139 });
140
141 let comparator_direction = if derived_direction == fallback_direction {
144 derived_direction
145 } else {
146 fallback_direction
147 };
148
149 KeyOrderComparator::from_direction(comparator_direction)
150}
151
152struct FastPathKeyResult {
159 ordered_key_stream: OrderedKeyStreamBox,
160 rows_scanned: usize,
161 optimization: ExecutionOptimization,
162}
163
164#[derive(Clone, Copy, Debug, Eq, PartialEq)]
172struct IndexRangeLimitSpec {
173 fetch: usize,
174}
175
176#[derive(Clone)]
184pub(crate) struct LoadExecutor<E: EntityKind> {
185 db: Db<E::Canister>,
186 debug: bool,
187 _marker: PhantomData<E>,
188}
189
190impl<E> LoadExecutor<E>
191where
192 E: EntityKind + EntityValue,
193{
194 #[must_use]
195 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
196 Self {
197 db,
198 debug,
199 _marker: PhantomData,
200 }
201 }
202
203 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
204 self.execute_paged_with_cursor(plan, PlannedCursor::none())
205 .map(|page| page.items)
206 }
207
208 pub(in crate::db) fn execute_paged_with_cursor(
209 &self,
210 plan: ExecutablePlan<E>,
211 cursor: impl Into<PlannedCursor>,
212 ) -> Result<CursorPage<E>, InternalError> {
213 self.execute_paged_with_cursor_traced(plan, cursor)
214 .map(|(page, _)| page)
215 }
216
217 pub(in crate::db) fn execute_paged_with_cursor_traced(
218 &self,
219 plan: ExecutablePlan<E>,
220 cursor: impl Into<PlannedCursor>,
221 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
222 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
223 let cursor_boundary = cursor.boundary().cloned();
224 let index_range_anchor = cursor.index_range_anchor().cloned();
225
226 if !plan.mode().is_load() {
227 return Err(InternalError::query_executor_invariant(
228 "load executor requires load plans",
229 ));
230 }
231
232 let direction = plan.direction();
233 let continuation_signature = plan.continuation_signature();
234 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
235 let index_range_specs = plan.index_range_specs()?.to_vec();
236 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
237 let mut execution_trace = self
238 .debug
239 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
240 let (plan, predicate_slots) = plan.into_parts();
241
242 let result = (|| {
243 let mut span = Span::<E>::new(ExecKind::Load);
244
245 validate_executor_plan::<E>(&plan)?;
246 let ctx = self.db.recovered_context::<E>()?;
247 let execution_inputs = ExecutionInputs {
248 ctx: &ctx,
249 plan: &plan,
250 stream_bindings: AccessStreamBindings {
251 index_prefix_specs: index_prefix_specs.as_slice(),
252 index_range_specs: index_range_specs.as_slice(),
253 index_range_anchor: index_range_anchor.as_ref(),
254 direction,
255 },
256 predicate_slots: predicate_slots.as_ref(),
257 };
258
259 record_plan_metrics(&plan.access);
260 let route_plan = Self::build_execution_route_plan_for_load(
262 &plan,
263 cursor_boundary.as_ref(),
264 index_range_anchor.as_ref(),
265 None,
266 direction,
267 )?;
268
269 let mut resolved = Self::resolve_execution_key_stream(&execution_inputs, &route_plan)?;
271 let (page, keys_scanned, post_access_rows) = Self::materialize_key_stream_into_page(
272 &ctx,
273 &plan,
274 predicate_slots.as_ref(),
275 resolved.key_stream.as_mut(),
276 route_plan.scan_hints.load_scan_budget_hint,
277 route_plan.streaming_access_shape_safe(),
278 cursor_boundary.as_ref(),
279 direction,
280 continuation_signature,
281 )?;
282 let rows_scanned = resolved.rows_scanned_override.unwrap_or(keys_scanned);
283 let distinct_keys_deduped = resolved
284 .distinct_keys_deduped_counter
285 .as_ref()
286 .map_or(0, |counter| counter.get());
287
288 Ok(Self::finalize_execution(
289 page,
290 resolved.optimization,
291 rows_scanned,
292 post_access_rows,
293 resolved.index_predicate_applied,
294 resolved.index_predicate_keys_rejected,
295 distinct_keys_deduped,
296 &mut span,
297 &mut execution_trace,
298 ))
299 })();
300
301 result.map(|page| (page, execution_trace))
302 }
303
304 fn finalize_path_outcome(
306 execution_trace: &mut Option<ExecutionTrace>,
307 optimization: Option<ExecutionOptimization>,
308 rows_scanned: usize,
309 rows_returned: usize,
310 index_predicate_applied: bool,
311 index_predicate_keys_rejected: u64,
312 distinct_keys_deduped: u64,
313 ) {
314 record_rows_scanned::<E>(rows_scanned);
315 if let Some(execution_trace) = execution_trace.as_mut() {
316 execution_trace.set_path_outcome(
317 optimization,
318 rows_scanned,
319 rows_returned,
320 index_predicate_applied,
321 index_predicate_keys_rejected,
322 distinct_keys_deduped,
323 );
324 debug_assert_eq!(
325 execution_trace.keys_scanned,
326 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
327 "execution trace keys_scanned must match rows_scanned metrics input",
328 );
329 }
330 }
331
332 fn validate_pk_fast_path_boundary_if_applicable(
334 plan: &LogicalPlan<E::Key>,
335 cursor_boundary: Option<&CursorBoundary>,
336 ) -> Result<(), InternalError> {
337 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
338 return Ok(());
339 }
340 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
341
342 Ok(())
343 }
344}