1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{FilterEvaluator, plan_for},
6 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7 query::{LoadQuery, QueryPlan, QueryValidate},
8 response::Response,
9 },
10 obs::metrics,
11 traits::{EntityKind, FieldValue},
12};
13use canic::log;
14use std::{cmp::Ordering, marker::PhantomData};
15
16#[derive(Clone, Copy)]
21pub struct LoadExecutor<E: EntityKind> {
22 db: Db<E::Canister>,
23 debug: bool,
24 _marker: PhantomData<E>,
25}
26
27impl<E: EntityKind> LoadExecutor<E> {
28 #[must_use]
29 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
30 Self {
31 db,
32 debug,
33 _marker: PhantomData,
34 }
35 }
36
37 #[inline]
38 fn debug_log(&self, s: impl Into<String>) {
39 if self.debug {
40 log!(Debug, "{}", s.into());
41 }
42 }
43
44 pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
50 self.one(value)?.try_key()
51 }
52
53 pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
55 self.one(value)?.try_entity()
56 }
57
58 pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
64 let query = LoadQuery::new().one::<E>(value);
65 self.execute(query)
66 }
67
68 pub fn only(&self) -> Result<Response<E>, Error> {
70 let query = LoadQuery::new().one::<E>(());
71 self.execute(query)
72 }
73
74 pub fn many(
76 &self,
77 values: impl IntoIterator<Item = impl FieldValue>,
78 ) -> Result<Response<E>, Error> {
79 let query = LoadQuery::new().many::<E>(values);
80 self.execute(query)
81 }
82
83 pub fn all(&self) -> Result<Response<E>, Error> {
85 let query = LoadQuery::new();
86 self.execute(query)
87 }
88
89 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
91 where
92 F: FnOnce(FilterDsl) -> I,
93 I: IntoFilterExpr,
94 {
95 let query = LoadQuery::new().filter(f);
96 self.execute(query)
97 }
98
99 pub fn count_all(self) -> Result<u32, Error> {
101 let query = LoadQuery::all();
102 self.count(query)
103 }
104
105 pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
112 QueryValidate::<E>::validate(&query)?;
113
114 Ok(plan_for::<E>(query.filter.as_ref()))
115 }
116
117 pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
119 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
120 QueryValidate::<E>::validate(&query)?;
121
122 self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
123
124 let ctx = self.db.context::<E>();
125 let plan = plan_for::<E>(query.filter.as_ref());
126
127 self.debug_log(format!("📄 Query plan: {plan:?}"));
128
129 let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
131 let data_rows = if pre_paginated {
132 let lim = query.limit.as_ref().unwrap();
133 ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
134 } else {
135 ctx.rows_from_plan(plan)?
136 };
137
138 self.debug_log(format!(
139 "📦 Loaded {} data rows before deserialization",
140 data_rows.len()
141 ));
142
143 let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
145 self.debug_log(format!(
146 "🧩 Deserialized {} entities before filtering",
147 rows.len()
148 ));
149
150 if let Some(f) = &query.filter {
152 let simplified = f.clone().simplify();
153 Self::apply_filter(&mut rows, &simplified);
154
155 self.debug_log(format!(
156 "🔎 Applied filter -> {} entities remaining",
157 rows.len()
158 ));
159 }
160
161 if let Some(sort) = &query.sort
163 && rows.len() > 1
164 {
165 Self::apply_sort(&mut rows, sort);
166 self.debug_log("↕️ Applied sort expression");
167 }
168
169 if let Some(lim) = &query.limit
171 && !pre_paginated
172 {
173 apply_pagination(&mut rows, lim.offset, lim.limit);
174 self.debug_log(format!(
175 "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
176 lim.offset,
177 lim.limit,
178 rows.len()
179 ));
180 }
181
182 crate::db::executor::set_rows_from_len(&mut span, rows.len());
183 self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
184
185 Ok(Response(rows))
186 }
187
188 #[allow(clippy::cast_possible_truncation)]
191 pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
192 let count = self.execute(query)?.count();
193
194 Ok(count)
195 }
196
197 fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
199 rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
200 }
201
202 fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
204 rows.sort_by(|(_, ea), (_, eb)| {
205 for (field, direction) in sort_expr.iter() {
206 let va = ea.get_value(field);
207 let vb = eb.get_value(field);
208
209 let ordering = match (va, vb) {
211 (None, None) => continue, (None, Some(_)) => Ordering::Less, (Some(_), None) => Ordering::Greater, (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
215 Some(ord) => ord,
216 None => continue, },
218 };
219
220 let ordering = match direction {
222 Order::Asc => ordering,
223 Order::Desc => ordering.reverse(),
224 };
225
226 if ordering != Ordering::Equal {
227 return ordering;
228 }
229 }
230
231 Ordering::Equal
233 });
234 }
235}
236
237fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
239 let total = rows.len();
240 let start = usize::min(offset as usize, total);
241 let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
242
243 if start >= end {
244 rows.clear();
245 } else {
246 rows.drain(..start);
247 rows.truncate(end - start);
248 }
249}
250
251#[cfg(test)]
256mod tests {
257 use super::{LoadExecutor, apply_pagination};
258 use crate::{
259 IndexSpec, Key, Value,
260 db::primitives::{Order, SortExpr},
261 traits::{
262 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
263 ValidateAuto, ValidateCustom, View, Visitable,
264 },
265 };
266 use serde::{Deserialize, Serialize};
267
268 #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
269 struct SortableEntity {
270 id: u64,
271 primary: i32,
272 secondary: i32,
273 optional_blob: Option<Vec<u8>>,
274 }
275
276 impl SortableEntity {
277 fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
278 Self {
279 id,
280 primary,
281 secondary,
282 optional_blob,
283 }
284 }
285 }
286
287 struct SortableCanister;
288 struct SortableStore;
289
290 impl Path for SortableCanister {
291 const PATH: &'static str = "test::canister";
292 }
293
294 impl CanisterKind for SortableCanister {}
295
296 impl Path for SortableStore {
297 const PATH: &'static str = "test::store";
298 }
299
300 impl StoreKind for SortableStore {
301 type Canister = SortableCanister;
302 }
303
304 impl Path for SortableEntity {
305 const PATH: &'static str = "test::sortable";
306 }
307
308 impl View for SortableEntity {
309 type ViewType = Self;
310
311 fn to_view(&self) -> Self::ViewType {
312 self.clone()
313 }
314
315 fn from_view(view: Self::ViewType) -> Self {
316 view
317 }
318 }
319
320 impl SanitizeAuto for SortableEntity {}
321 impl SanitizeCustom for SortableEntity {}
322 impl ValidateAuto for SortableEntity {}
323 impl ValidateCustom for SortableEntity {}
324 impl Visitable for SortableEntity {}
325
326 impl FieldValues for SortableEntity {
327 fn get_value(&self, field: &str) -> Option<Value> {
328 match field {
329 "id" => Some(Value::Uint(self.id)),
330 "primary" => Some(Value::Int(i64::from(self.primary))),
331 "secondary" => Some(Value::Int(i64::from(self.secondary))),
332 "optional_blob" => self.optional_blob.clone().map(Value::Blob),
333 _ => None,
334 }
335 }
336 }
337
338 impl EntityKind for SortableEntity {
339 type PrimaryKey = u64;
340 type Store = SortableStore;
341 type Canister = SortableCanister;
342
343 const ENTITY_ID: u64 = 99;
344 const PRIMARY_KEY: &'static str = "id";
345 const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
346 const INDEXES: &'static [&'static IndexSpec] = &[];
347
348 fn key(&self) -> Key {
349 self.id.into()
350 }
351
352 fn primary_key(&self) -> Self::PrimaryKey {
353 self.id
354 }
355 }
356
357 #[test]
358 fn pagination_empty_vec() {
359 let mut v: Vec<i32> = vec![];
360 apply_pagination(&mut v, 0, Some(10));
361 assert!(v.is_empty());
362 }
363
364 #[test]
365 fn pagination_offset_beyond_len_clears() {
366 let mut v = vec![1, 2, 3];
367 apply_pagination(&mut v, 10, Some(5));
368 assert!(v.is_empty());
369 }
370
371 #[test]
372 fn pagination_no_limit_from_offset() {
373 let mut v = vec![1, 2, 3, 4, 5];
374 apply_pagination(&mut v, 2, None);
375 assert_eq!(v, vec![3, 4, 5]);
376 }
377
378 #[test]
379 fn pagination_exact_window() {
380 let mut v = vec![10, 20, 30, 40, 50];
381 apply_pagination(&mut v, 1, Some(3));
383 assert_eq!(v, vec![20, 30, 40]);
384 }
385
386 #[test]
387 fn pagination_limit_exceeds_tail() {
388 let mut v = vec![10, 20, 30];
389 apply_pagination(&mut v, 1, Some(999));
391 assert_eq!(v, vec![20, 30]);
392 }
393
394 #[test]
395 fn apply_sort_orders_descending() {
396 let mut rows = vec![
397 (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
398 (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
399 (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
400 ];
401 let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
402
403 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
404
405 let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
406 assert_eq!(primary, vec![30, 20, 10]);
407 }
408
409 #[test]
410 fn apply_sort_uses_secondary_field_for_ties() {
411 let mut rows = vec![
412 (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
413 (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
414 (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
415 ];
416 let sort_expr = SortExpr::from(vec![
417 ("primary".to_string(), Order::Asc),
418 ("secondary".to_string(), Order::Desc),
419 ]);
420
421 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
422
423 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
424 assert_eq!(ids, vec![2, 1, 3]);
425 }
426
427 #[test]
428 fn apply_sort_places_none_before_some_and_falls_back() {
429 let mut rows = vec![
430 (
431 Key::from(3_u64),
432 SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
433 ),
434 (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
435 (
436 Key::from(2_u64),
437 SortableEntity::new(2, 0, 0, Some(vec![2])),
438 ),
439 ];
440 let sort_expr = SortExpr::from(vec![
441 ("optional_blob".to_string(), Order::Asc),
442 ("id".to_string(), Order::Asc),
443 ]);
444
445 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
446
447 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
448 assert_eq!(ids, vec![1, 2, 3]);
449 }
450}