1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{FilterEvaluator, plan_for},
6 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7 query::{LoadQuery, QueryPlan, QueryValidate},
8 response::Response,
9 },
10 obs::metrics,
11 traits::{EntityKind, FieldValue},
12};
13use canic::log;
14use std::{cmp::Ordering, marker::PhantomData};
15
16#[derive(Clone, Copy)]
21pub struct LoadExecutor<E: EntityKind> {
22 db: Db<E::Canister>,
23 debug: bool,
24 _marker: PhantomData<E>,
25}
26
27impl<E: EntityKind> LoadExecutor<E> {
28 #[must_use]
29 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
30 Self {
31 db,
32 debug,
33 _marker: PhantomData,
34 }
35 }
36
37 #[inline]
38 fn debug_log(&self, s: impl Into<String>) {
39 if self.debug {
40 log!(Debug, "{}", s.into());
41 }
42 }
43
44 pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
49 self.one(value)?.try_key()
50 }
51
52 pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
53 self.one(value)?.try_entity()
54 }
55
56 pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
61 let query = LoadQuery::new().one::<E>(value);
62 self.execute(query)
63 }
64
65 pub fn only(&self) -> Result<Response<E>, Error> {
66 let query = LoadQuery::new().one::<E>(());
67 self.execute(query)
68 }
69
70 pub fn many(
71 &self,
72 values: impl IntoIterator<Item = impl FieldValue>,
73 ) -> Result<Response<E>, Error> {
74 let query = LoadQuery::new().many::<E>(values);
75 self.execute(query)
76 }
77
78 pub fn all(&self) -> Result<Response<E>, Error> {
79 let query = LoadQuery::new();
80 self.execute(query)
81 }
82
83 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
84 where
85 F: FnOnce(FilterDsl) -> I,
86 I: IntoFilterExpr,
87 {
88 let query = LoadQuery::new().filter(f);
89 self.execute(query)
90 }
91
92 pub fn count_all(self) -> Result<u32, Error> {
93 let query = LoadQuery::all();
94 self.count(query)
95 }
96
97 pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
103 QueryValidate::<E>::validate(&query)?;
104
105 Ok(plan_for::<E>(query.filter.as_ref()))
106 }
107
108 pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
110 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
111 QueryValidate::<E>::validate(&query)?;
112
113 self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
114
115 let ctx = self.db.context::<E>();
116 let plan = plan_for::<E>(query.filter.as_ref());
117
118 self.debug_log(format!("📄 Query plan: {plan:?}"));
119
120 let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
122 let data_rows = if pre_paginated {
123 let lim = query.limit.as_ref().unwrap();
124 ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
125 } else {
126 ctx.rows_from_plan(plan)?
127 };
128
129 self.debug_log(format!(
130 "📦 Loaded {} data rows before deserialization",
131 data_rows.len()
132 ));
133
134 let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
136 self.debug_log(format!(
137 "🧩 Deserialized {} entities before filtering",
138 rows.len()
139 ));
140
141 if let Some(f) = &query.filter {
143 let simplified = f.clone().simplify();
144 Self::apply_filter(&mut rows, &simplified);
145
146 self.debug_log(format!(
147 "🔎 Applied filter -> {} entities remaining",
148 rows.len()
149 ));
150 }
151
152 if let Some(sort) = &query.sort
154 && rows.len() > 1
155 {
156 Self::apply_sort(&mut rows, sort);
157 self.debug_log("↕️ Applied sort expression");
158 }
159
160 if let Some(lim) = &query.limit
162 && !pre_paginated
163 {
164 apply_pagination(&mut rows, lim.offset, lim.limit);
165 self.debug_log(format!(
166 "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
167 lim.offset,
168 lim.limit,
169 rows.len()
170 ));
171 }
172
173 crate::db::executor::set_rows_from_len(&mut span, rows.len());
174 self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
175
176 Ok(Response(rows))
177 }
178
179 #[allow(clippy::cast_possible_truncation)]
182 pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
183 let count = self.execute(query)?.count();
184
185 Ok(count)
186 }
187
188 fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
190 rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
191 }
192
193 fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
195 rows.sort_by(|(_, ea), (_, eb)| {
196 for (field, direction) in sort_expr.iter() {
197 let va = ea.get_value(field);
198 let vb = eb.get_value(field);
199
200 let ordering = match (va, vb) {
202 (None, None) => continue, (None, Some(_)) => Ordering::Less, (Some(_), None) => Ordering::Greater, (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
206 Some(ord) => ord,
207 None => continue, },
209 };
210
211 let ordering = match direction {
213 Order::Asc => ordering,
214 Order::Desc => ordering.reverse(),
215 };
216
217 if ordering != Ordering::Equal {
218 return ordering;
219 }
220 }
221
222 Ordering::Equal
224 });
225 }
226}
227
228pub fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
230 let total = rows.len();
231 let start = usize::min(offset as usize, total);
232 let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
233
234 if start >= end {
235 rows.clear();
236 } else {
237 rows.drain(..start);
238 rows.truncate(end - start);
239 }
240}
241
242#[cfg(test)]
247mod tests {
248 use super::{LoadExecutor, apply_pagination};
249 use crate::{
250 IndexSpec, Key, Value,
251 db::primitives::{Order, SortExpr},
252 traits::{
253 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
254 ValidateAuto, ValidateCustom, View, Visitable,
255 },
256 };
257 use serde::{Deserialize, Serialize};
258
259 #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
260 struct SortableEntity {
261 id: u64,
262 primary: i32,
263 secondary: i32,
264 optional_blob: Option<Vec<u8>>,
265 }
266
267 impl SortableEntity {
268 fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
269 Self {
270 id,
271 primary,
272 secondary,
273 optional_blob,
274 }
275 }
276 }
277
278 struct SortableCanister;
279 struct SortableStore;
280
281 impl Path for SortableCanister {
282 const PATH: &'static str = "test::canister";
283 }
284
285 impl CanisterKind for SortableCanister {}
286
287 impl Path for SortableStore {
288 const PATH: &'static str = "test::store";
289 }
290
291 impl StoreKind for SortableStore {
292 type Canister = SortableCanister;
293 }
294
295 impl Path for SortableEntity {
296 const PATH: &'static str = "test::sortable";
297 }
298
299 impl View for SortableEntity {
300 type ViewType = Self;
301
302 fn to_view(&self) -> Self::ViewType {
303 self.clone()
304 }
305
306 fn from_view(view: Self::ViewType) -> Self {
307 view
308 }
309 }
310
311 impl SanitizeAuto for SortableEntity {}
312 impl SanitizeCustom for SortableEntity {}
313 impl ValidateAuto for SortableEntity {}
314 impl ValidateCustom for SortableEntity {}
315 impl Visitable for SortableEntity {}
316
317 impl FieldValues for SortableEntity {
318 fn get_value(&self, field: &str) -> Option<Value> {
319 match field {
320 "id" => Some(Value::Uint(self.id)),
321 "primary" => Some(Value::Int(i64::from(self.primary))),
322 "secondary" => Some(Value::Int(i64::from(self.secondary))),
323 "optional_blob" => self.optional_blob.clone().map(Value::Blob),
324 _ => None,
325 }
326 }
327 }
328
329 impl EntityKind for SortableEntity {
330 type PrimaryKey = u64;
331 type Store = SortableStore;
332 type Canister = SortableCanister;
333
334 const ENTITY_ID: u64 = 99;
335 const PRIMARY_KEY: &'static str = "id";
336 const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
337 const INDEXES: &'static [&'static IndexSpec] = &[];
338
339 fn key(&self) -> Key {
340 self.id.into()
341 }
342
343 fn primary_key(&self) -> Self::PrimaryKey {
344 self.id
345 }
346 }
347
348 #[test]
349 fn pagination_empty_vec() {
350 let mut v: Vec<i32> = vec![];
351 apply_pagination(&mut v, 0, Some(10));
352 assert!(v.is_empty());
353 }
354
355 #[test]
356 fn pagination_offset_beyond_len_clears() {
357 let mut v = vec![1, 2, 3];
358 apply_pagination(&mut v, 10, Some(5));
359 assert!(v.is_empty());
360 }
361
362 #[test]
363 fn pagination_no_limit_from_offset() {
364 let mut v = vec![1, 2, 3, 4, 5];
365 apply_pagination(&mut v, 2, None);
366 assert_eq!(v, vec![3, 4, 5]);
367 }
368
369 #[test]
370 fn pagination_exact_window() {
371 let mut v = vec![10, 20, 30, 40, 50];
372 apply_pagination(&mut v, 1, Some(3));
374 assert_eq!(v, vec![20, 30, 40]);
375 }
376
377 #[test]
378 fn pagination_limit_exceeds_tail() {
379 let mut v = vec![10, 20, 30];
380 apply_pagination(&mut v, 1, Some(999));
382 assert_eq!(v, vec![20, 30]);
383 }
384
385 #[test]
386 fn apply_sort_orders_descending() {
387 let mut rows = vec![
388 (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
389 (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
390 (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
391 ];
392 let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
393
394 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
395
396 let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
397 assert_eq!(primary, vec![30, 20, 10]);
398 }
399
400 #[test]
401 fn apply_sort_uses_secondary_field_for_ties() {
402 let mut rows = vec![
403 (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
404 (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
405 (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
406 ];
407 let sort_expr = SortExpr::from(vec![
408 ("primary".to_string(), Order::Asc),
409 ("secondary".to_string(), Order::Desc),
410 ]);
411
412 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
413
414 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
415 assert_eq!(ids, vec![2, 1, 3]);
416 }
417
418 #[test]
419 fn apply_sort_places_none_before_some_and_falls_back() {
420 let mut rows = vec![
421 (
422 Key::from(3_u64),
423 SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
424 ),
425 (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
426 (
427 Key::from(2_u64),
428 SortableEntity::new(2, 0, 0, Some(vec![2])),
429 ),
430 ];
431 let sort_expr = SortExpr::from(vec![
432 ("optional_blob".to_string(), Order::Asc),
433 ("id".to_string(), Order::Asc),
434 ]);
435
436 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
437
438 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
439 assert_eq!(ids, vec![1, 2, 3]);
440 }
441}