1mod execute;
2mod index_range_limit;
3mod page;
4mod pk_stream;
5mod route;
6mod secondary_index;
7
8use crate::{
9 db::{
10 Db,
11 executor::KeyOrderComparator,
12 executor::OrderedKeyStreamBox,
13 executor::plan::{record_plan_metrics, record_rows_scanned},
14 index::RawIndexKey,
15 query::plan::{
16 AccessPlan, AccessPlanProjection, CursorBoundary, Direction, ExecutablePlan,
17 LogicalPlan, OrderDirection, PlannedCursor, SlotSelectionPolicy, compute_page_window,
18 decode_pk_cursor_boundary, derive_scan_direction, project_access_plan,
19 validate::validate_executor_plan,
20 },
21 response::Response,
22 },
23 error::InternalError,
24 obs::sink::{ExecKind, Span},
25 traits::{EntityKind, EntityValue},
26 value::Value,
27};
28use std::{marker::PhantomData, ops::Bound};
29
30#[derive(Debug)]
38pub(crate) struct CursorPage<E: EntityKind> {
39 pub(crate) items: Response<E>,
40
41 pub(crate) next_cursor: Option<Vec<u8>>,
42}
43
44#[derive(Clone, Copy, Debug, Eq, PartialEq)]
50pub enum ExecutionAccessPathVariant {
51 ByKey,
52 ByKeys,
53 KeyRange,
54 IndexPrefix,
55 IndexRange,
56 FullScan,
57 Union,
58 Intersection,
59}
60
61#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum ExecutionOptimization {
69 PrimaryKey,
70 SecondaryOrderPushdown,
71 IndexRangeLimitPushdown,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
82pub struct ExecutionTrace {
83 pub access_path_variant: ExecutionAccessPathVariant,
84 pub direction: OrderDirection,
85 pub optimization: Option<ExecutionOptimization>,
86 pub keys_scanned: u64,
87 pub rows_returned: u64,
88 pub continuation_applied: bool,
89}
90
91impl ExecutionTrace {
92 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
93 Self {
94 access_path_variant: access_path_variant(access),
95 direction: execution_order_direction(direction),
96 optimization: None,
97 keys_scanned: 0,
98 rows_returned: 0,
99 continuation_applied,
100 }
101 }
102
103 fn set_path_outcome(
104 &mut self,
105 optimization: Option<ExecutionOptimization>,
106 keys_scanned: usize,
107 rows_returned: usize,
108 ) {
109 self.optimization = optimization;
110 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
111 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
112 }
113}
114
115struct ExecutionAccessProjection;
116
117impl<K> AccessPlanProjection<K> for ExecutionAccessProjection {
118 type Output = ExecutionAccessPathVariant;
119
120 fn by_key(&mut self, _key: &K) -> Self::Output {
121 ExecutionAccessPathVariant::ByKey
122 }
123
124 fn by_keys(&mut self, _keys: &[K]) -> Self::Output {
125 ExecutionAccessPathVariant::ByKeys
126 }
127
128 fn key_range(&mut self, _start: &K, _end: &K) -> Self::Output {
129 ExecutionAccessPathVariant::KeyRange
130 }
131
132 fn index_prefix(
133 &mut self,
134 _index_name: &'static str,
135 _index_fields: &[&'static str],
136 _prefix_len: usize,
137 _values: &[Value],
138 ) -> Self::Output {
139 ExecutionAccessPathVariant::IndexPrefix
140 }
141
142 fn index_range(
143 &mut self,
144 _index_name: &'static str,
145 _index_fields: &[&'static str],
146 _prefix_len: usize,
147 _prefix: &[Value],
148 _lower: &Bound<Value>,
149 _upper: &Bound<Value>,
150 ) -> Self::Output {
151 ExecutionAccessPathVariant::IndexRange
152 }
153
154 fn full_scan(&mut self) -> Self::Output {
155 ExecutionAccessPathVariant::FullScan
156 }
157
158 fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
159 ExecutionAccessPathVariant::Union
160 }
161
162 fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
163 ExecutionAccessPathVariant::Intersection
164 }
165}
166
167fn access_path_variant<K>(access: &AccessPlan<K>) -> ExecutionAccessPathVariant {
168 let mut projection = ExecutionAccessProjection;
169 project_access_plan(access, &mut projection)
170}
171
172const fn execution_order_direction(direction: Direction) -> OrderDirection {
173 match direction {
174 Direction::Asc => OrderDirection::Asc,
175 Direction::Desc => OrderDirection::Desc,
176 }
177}
178
179fn key_stream_comparator_from_plan<K>(
180 plan: &LogicalPlan<K>,
181 fallback_direction: Direction,
182) -> KeyOrderComparator {
183 let derived_direction = plan.order.as_ref().map_or(fallback_direction, |order| {
184 derive_scan_direction(order, SlotSelectionPolicy::Last)
185 });
186
187 let comparator_direction = if derived_direction == fallback_direction {
190 derived_direction
191 } else {
192 fallback_direction
193 };
194
195 KeyOrderComparator::from_direction(comparator_direction)
196}
197
198struct FastPathKeyResult {
205 ordered_key_stream: OrderedKeyStreamBox,
206 rows_scanned: usize,
207 optimization: ExecutionOptimization,
208}
209
210struct IndexRangeLimitSpec {
218 fetch: usize,
219}
220
221#[derive(Clone)]
229pub(crate) struct LoadExecutor<E: EntityKind> {
230 db: Db<E::Canister>,
231 debug: bool,
232 _marker: PhantomData<E>,
233}
234
235impl<E> LoadExecutor<E>
236where
237 E: EntityKind + EntityValue,
238{
239 #[must_use]
240 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
241 Self {
242 db,
243 debug,
244 _marker: PhantomData,
245 }
246 }
247
248 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
249 self.execute_paged_with_cursor(plan, PlannedCursor::none())
250 .map(|page| page.items)
251 }
252
253 pub(in crate::db) fn execute_paged_with_cursor(
254 &self,
255 plan: ExecutablePlan<E>,
256 cursor: impl Into<PlannedCursor>,
257 ) -> Result<CursorPage<E>, InternalError> {
258 self.execute_paged_with_cursor_traced(plan, cursor)
259 .map(|(page, _)| page)
260 }
261
262 pub(in crate::db) fn execute_paged_with_cursor_traced(
263 &self,
264 plan: ExecutablePlan<E>,
265 cursor: impl Into<PlannedCursor>,
266 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
267 let cursor: PlannedCursor = plan.revalidate_planned_cursor(cursor.into())?;
268 let cursor_boundary = cursor.boundary().cloned();
269 let index_range_anchor = cursor.index_range_anchor().cloned();
270
271 if !plan.mode().is_load() {
272 return Err(InternalError::query_invariant(
273 "executor invariant violated: load executor requires load plans",
274 ));
275 }
276
277 let direction = plan.direction();
278 let continuation_signature = plan.continuation_signature();
279 let continuation_applied = cursor_boundary.is_some() || index_range_anchor.is_some();
280 let mut execution_trace = self
281 .debug
282 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
283
284 let result = (|| {
285 let mut span = Span::<E>::new(ExecKind::Load);
286 let plan = plan.into_inner();
287
288 validate_executor_plan::<E>(&plan)?;
289 let ctx = self.db.recovered_context::<E>()?;
290
291 record_plan_metrics(&plan.access);
292 let fast_path_plan = Self::build_fast_path_plan(
294 &plan,
295 cursor_boundary.as_ref(),
296 index_range_anchor.as_ref(),
297 )?;
298
299 if let Some(page) = Self::try_execute_fast_path_plan(
300 &ctx,
301 &plan,
302 &fast_path_plan,
303 cursor_boundary.as_ref(),
304 index_range_anchor.as_ref(),
305 direction,
306 continuation_signature,
307 &mut span,
308 &mut execution_trace,
309 )? {
310 return Ok(page);
311 }
312
313 let page = Self::execute_fallback_path(
314 &ctx,
315 &plan,
316 cursor_boundary.as_ref(),
317 index_range_anchor.as_ref(),
318 direction,
319 continuation_signature,
320 &mut span,
321 &mut execution_trace,
322 )?;
323 Ok(page)
324 })();
325
326 result.map(|page| (page, execution_trace))
327 }
328
329 fn finalize_path_outcome(
331 execution_trace: &mut Option<ExecutionTrace>,
332 optimization: Option<ExecutionOptimization>,
333 rows_scanned: usize,
334 rows_returned: usize,
335 ) {
336 record_rows_scanned::<E>(rows_scanned);
337 if let Some(execution_trace) = execution_trace.as_mut() {
338 execution_trace.set_path_outcome(optimization, rows_scanned, rows_returned);
339 debug_assert_eq!(
340 execution_trace.keys_scanned,
341 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
342 "execution trace keys_scanned must match rows_scanned metrics input",
343 );
344 }
345 }
346
347 fn validate_pk_fast_path_boundary_if_applicable(
349 plan: &LogicalPlan<E::Key>,
350 cursor_boundary: Option<&CursorBoundary>,
351 ) -> Result<(), InternalError> {
352 if !Self::is_pk_order_stream_eligible(plan) {
353 return Ok(());
354 }
355 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
356
357 Ok(())
358 }
359
360 fn assess_index_range_limit_pushdown(
361 plan: &LogicalPlan<E::Key>,
362 cursor_boundary: Option<&CursorBoundary>,
363 index_range_anchor: Option<&RawIndexKey>,
364 ) -> Option<IndexRangeLimitSpec> {
365 if !Self::is_index_range_limit_pushdown_shape_eligible(plan) {
366 return None;
367 }
368 if cursor_boundary.is_some() && index_range_anchor.is_none() {
369 return None;
370 }
371
372 let page = plan.page.as_ref()?;
373 let limit = page.limit?;
374 if limit == 0 {
375 return Some(IndexRangeLimitSpec { fetch: 0 });
376 }
377
378 let fetch = compute_page_window(page.offset, limit, true).fetch_count;
379
380 Some(IndexRangeLimitSpec { fetch })
381 }
382}