1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{
6 FilterEvaluator,
7 plan::{plan_for, set_rows_from_len},
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
10 query::{LoadQuery, QueryPlan, QueryValidate},
11 response::Response,
12 },
13 obs::metrics,
14 traits::{EntityKind, FieldValue},
15};
16use std::{cmp::Ordering, marker::PhantomData};
17
18#[derive(Clone, Copy)]
23pub struct LoadExecutor<E: EntityKind> {
24 db: Db<E::Canister>,
25 debug: bool,
26 _marker: PhantomData<E>,
27}
28
29impl<E: EntityKind> LoadExecutor<E> {
30 #[must_use]
31 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
32 Self {
33 db,
34 debug,
35 _marker: PhantomData,
36 }
37 }
38
39 #[inline]
40 fn debug_log(&self, s: impl Into<String>) {
41 if self.debug {
42 println!("{}", s.into());
43 }
44 }
45
46 pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
52 self.one(value)?.try_key()
53 }
54
55 pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
57 self.one(value)?.try_entity()
58 }
59
60 pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
66 let query = LoadQuery::new().one::<E>(value);
67 self.execute(query)
68 }
69
70 pub fn only(&self) -> Result<Response<E>, Error> {
72 let query = LoadQuery::new().one::<E>(());
73 self.execute(query)
74 }
75
76 pub fn many(
78 &self,
79 values: impl IntoIterator<Item = impl FieldValue>,
80 ) -> Result<Response<E>, Error> {
81 let query = LoadQuery::new().many::<E>(values);
82 self.execute(query)
83 }
84
85 pub fn all(&self) -> Result<Response<E>, Error> {
87 let query = LoadQuery::new();
88 self.execute(query)
89 }
90
91 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
93 where
94 F: FnOnce(FilterDsl) -> I,
95 I: IntoFilterExpr,
96 {
97 let query = LoadQuery::new().filter(f);
98 self.execute(query)
99 }
100
101 pub fn count_all(self) -> Result<u32, Error> {
103 let query = LoadQuery::all();
104 self.count(query)
105 }
106
107 pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
114 QueryValidate::<E>::validate(&query)?;
115
116 Ok(plan_for::<E>(query.filter.as_ref()))
117 }
118
119 pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
121 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
122 QueryValidate::<E>::validate(&query)?;
123
124 self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
125
126 let ctx = self.db.context::<E>();
127 let plan = plan_for::<E>(query.filter.as_ref());
128
129 self.debug_log(format!("📄 Query plan: {plan:?}"));
130
131 let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
133 let mut rows: Vec<(Key, E)> = if pre_paginated {
134 let lim = query.limit.as_ref().unwrap();
135 let data_rows = ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?;
136
137 self.debug_log(format!(
138 "📦 Scanned {} data rows before deserialization",
139 data_rows.len()
140 ));
141
142 let rows = ctx.deserialize_rows(data_rows)?;
143 self.debug_log(format!(
144 "🧩 Deserialized {} entities before filtering",
145 rows.len()
146 ));
147 rows
148 } else {
149 let data_rows = ctx.rows_from_plan(plan)?;
150 self.debug_log(format!(
151 "📦 Scanned {} data rows before deserialization",
152 data_rows.len()
153 ));
154
155 let rows = ctx.deserialize_rows(data_rows)?;
156 self.debug_log(format!(
157 "🧩 Deserialized {} entities before filtering",
158 rows.len()
159 ));
160
161 rows
162 };
163
164 if let Some(f) = &query.filter {
166 let simplified = f.clone().simplify();
167 Self::apply_filter(&mut rows, &simplified);
168
169 self.debug_log(format!(
170 "🔎 Applied filter -> {} entities remaining",
171 rows.len()
172 ));
173 }
174
175 if let Some(sort) = &query.sort
177 && rows.len() > 1
178 {
179 Self::apply_sort(&mut rows, sort);
180 self.debug_log("↕️ Applied sort expression");
181 }
182
183 if let Some(lim) = &query.limit
185 && !pre_paginated
186 {
187 apply_pagination(&mut rows, lim.offset, lim.limit);
188 self.debug_log(format!(
189 "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
190 lim.offset,
191 lim.limit,
192 rows.len()
193 ));
194 }
195
196 set_rows_from_len(&mut span, rows.len());
197 self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
198
199 Ok(Response(rows))
200 }
201
202 #[allow(clippy::cast_possible_truncation)]
205 pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
206 let count = self.execute(query)?.count();
207
208 Ok(count)
209 }
210
211 fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
213 rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
214 }
215
216 fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
218 rows.sort_by(|(_, ea), (_, eb)| {
219 for (field, direction) in sort_expr.iter() {
220 let va = ea.get_value(field);
221 let vb = eb.get_value(field);
222
223 let ordering = match (va, vb) {
225 (None, None) => continue, (None, Some(_)) => Ordering::Less, (Some(_), None) => Ordering::Greater, (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
229 Some(ord) => ord,
230 None => continue, },
232 };
233
234 let ordering = match direction {
236 Order::Asc => ordering,
237 Order::Desc => ordering.reverse(),
238 };
239
240 if ordering != Ordering::Equal {
241 return ordering;
242 }
243 }
244
245 Ordering::Equal
247 });
248 }
249}
250
251fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
253 let total = rows.len();
254 let start = usize::min(offset as usize, total);
255 let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
256
257 if start >= end {
258 rows.clear();
259 } else {
260 rows.drain(..start);
261 rows.truncate(end - start);
262 }
263}
264
265#[cfg(test)]
270mod tests {
271 use super::{LoadExecutor, apply_pagination};
272 use crate::{
273 IndexSpec, Key, Value,
274 db::primitives::{Order, SortExpr},
275 traits::{
276 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
277 ValidateAuto, ValidateCustom, View, Visitable,
278 },
279 };
280 use serde::{Deserialize, Serialize};
281
282 #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
283 struct SortableEntity {
284 id: u64,
285 primary: i32,
286 secondary: i32,
287 optional_blob: Option<Vec<u8>>,
288 }
289
290 impl SortableEntity {
291 fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
292 Self {
293 id,
294 primary,
295 secondary,
296 optional_blob,
297 }
298 }
299 }
300
301 struct SortableCanister;
302 struct SortableStore;
303
304 impl Path for SortableCanister {
305 const PATH: &'static str = "test::canister";
306 }
307
308 impl CanisterKind for SortableCanister {}
309
310 impl Path for SortableStore {
311 const PATH: &'static str = "test::store";
312 }
313
314 impl StoreKind for SortableStore {
315 type Canister = SortableCanister;
316 }
317
318 impl Path for SortableEntity {
319 const PATH: &'static str = "test::sortable";
320 }
321
322 impl View for SortableEntity {
323 type ViewType = Self;
324
325 fn to_view(&self) -> Self::ViewType {
326 self.clone()
327 }
328
329 fn from_view(view: Self::ViewType) -> Self {
330 view
331 }
332 }
333
334 impl SanitizeAuto for SortableEntity {}
335 impl SanitizeCustom for SortableEntity {}
336 impl ValidateAuto for SortableEntity {}
337 impl ValidateCustom for SortableEntity {}
338 impl Visitable for SortableEntity {}
339
340 impl FieldValues for SortableEntity {
341 fn get_value(&self, field: &str) -> Option<Value> {
342 match field {
343 "id" => Some(Value::Uint(self.id)),
344 "primary" => Some(Value::Int(i64::from(self.primary))),
345 "secondary" => Some(Value::Int(i64::from(self.secondary))),
346 "optional_blob" => self.optional_blob.clone().map(Value::Blob),
347 _ => None,
348 }
349 }
350 }
351
352 impl EntityKind for SortableEntity {
353 type PrimaryKey = u64;
354 type Store = SortableStore;
355 type Canister = SortableCanister;
356
357 const ENTITY_ID: u64 = 99;
358 const PRIMARY_KEY: &'static str = "id";
359 const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
360 const INDEXES: &'static [&'static IndexSpec] = &[];
361
362 fn key(&self) -> Key {
363 self.id.into()
364 }
365
366 fn primary_key(&self) -> Self::PrimaryKey {
367 self.id
368 }
369 }
370
371 #[test]
372 fn pagination_empty_vec() {
373 let mut v: Vec<i32> = vec![];
374 apply_pagination(&mut v, 0, Some(10));
375 assert!(v.is_empty());
376 }
377
378 #[test]
379 fn pagination_offset_beyond_len_clears() {
380 let mut v = vec![1, 2, 3];
381 apply_pagination(&mut v, 10, Some(5));
382 assert!(v.is_empty());
383 }
384
385 #[test]
386 fn pagination_no_limit_from_offset() {
387 let mut v = vec![1, 2, 3, 4, 5];
388 apply_pagination(&mut v, 2, None);
389 assert_eq!(v, vec![3, 4, 5]);
390 }
391
392 #[test]
393 fn pagination_exact_window() {
394 let mut v = vec![10, 20, 30, 40, 50];
395 apply_pagination(&mut v, 1, Some(3));
397 assert_eq!(v, vec![20, 30, 40]);
398 }
399
400 #[test]
401 fn pagination_limit_exceeds_tail() {
402 let mut v = vec![10, 20, 30];
403 apply_pagination(&mut v, 1, Some(999));
405 assert_eq!(v, vec![20, 30]);
406 }
407
408 #[test]
409 fn apply_sort_orders_descending() {
410 let mut rows = vec![
411 (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
412 (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
413 (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
414 ];
415 let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
416
417 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
418
419 let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
420 assert_eq!(primary, vec![30, 20, 10]);
421 }
422
423 #[test]
424 fn apply_sort_uses_secondary_field_for_ties() {
425 let mut rows = vec![
426 (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
427 (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
428 (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
429 ];
430 let sort_expr = SortExpr::from(vec![
431 ("primary".to_string(), Order::Asc),
432 ("secondary".to_string(), Order::Desc),
433 ]);
434
435 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
436
437 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
438 assert_eq!(ids, vec![2, 1, 3]);
439 }
440
441 #[test]
442 fn apply_sort_places_none_before_some_and_falls_back() {
443 let mut rows = vec![
444 (
445 Key::from(3_u64),
446 SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
447 ),
448 (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
449 (
450 Key::from(2_u64),
451 SortableEntity::new(2, 0, 0, Some(vec![2])),
452 ),
453 ];
454 let sort_expr = SortExpr::from(vec![
455 ("optional_blob".to_string(), Order::Asc),
456 ("id".to_string(), Order::Asc),
457 ]);
458
459 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
460
461 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
462 assert_eq!(ids, vec![1, 2, 3]);
463 }
464}