1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{FilterEvaluator, plan_for},
6 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7 query::{LoadQuery, QueryPlan, QueryValidate},
8 response::Response,
9 },
10 obs::metrics,
11 traits::{EntityKind, FieldValue},
12};
13use std::{cmp::Ordering, marker::PhantomData};
14
15#[derive(Clone, Copy)]
20pub struct LoadExecutor<E: EntityKind> {
21 db: Db<E::Canister>,
22 debug: bool,
23 _marker: PhantomData<E>,
24}
25
26impl<E: EntityKind> LoadExecutor<E> {
27 #[must_use]
28 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
29 Self {
30 db,
31 debug,
32 _marker: PhantomData,
33 }
34 }
35
36 #[inline]
37 fn debug_log(&self, s: impl Into<String>) {
38 if self.debug {
39 println!("{}", s.into());
40 }
41 }
42
43 pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
49 self.one(value)?.try_key()
50 }
51
52 pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
54 self.one(value)?.try_entity()
55 }
56
57 pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
63 let query = LoadQuery::new().one::<E>(value);
64 self.execute(query)
65 }
66
67 pub fn only(&self) -> Result<Response<E>, Error> {
69 let query = LoadQuery::new().one::<E>(());
70 self.execute(query)
71 }
72
73 pub fn many(
75 &self,
76 values: impl IntoIterator<Item = impl FieldValue>,
77 ) -> Result<Response<E>, Error> {
78 let query = LoadQuery::new().many::<E>(values);
79 self.execute(query)
80 }
81
82 pub fn all(&self) -> Result<Response<E>, Error> {
84 let query = LoadQuery::new();
85 self.execute(query)
86 }
87
88 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
90 where
91 F: FnOnce(FilterDsl) -> I,
92 I: IntoFilterExpr,
93 {
94 let query = LoadQuery::new().filter(f);
95 self.execute(query)
96 }
97
98 pub fn count_all(self) -> Result<u32, Error> {
100 let query = LoadQuery::all();
101 self.count(query)
102 }
103
104 pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
111 QueryValidate::<E>::validate(&query)?;
112
113 Ok(plan_for::<E>(query.filter.as_ref()))
114 }
115
116 pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
118 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
119 QueryValidate::<E>::validate(&query)?;
120
121 self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
122
123 let ctx = self.db.context::<E>();
124 let plan = plan_for::<E>(query.filter.as_ref());
125
126 self.debug_log(format!("📄 Query plan: {plan:?}"));
127
128 let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
130 let data_rows = if pre_paginated {
131 let lim = query.limit.as_ref().unwrap();
132 ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
133 } else {
134 ctx.rows_from_plan(plan)?
135 };
136
137 self.debug_log(format!(
138 "📦 Loaded {} data rows before deserialization",
139 data_rows.len()
140 ));
141
142 let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
144 self.debug_log(format!(
145 "🧩 Deserialized {} entities before filtering",
146 rows.len()
147 ));
148
149 if let Some(f) = &query.filter {
151 let simplified = f.clone().simplify();
152 Self::apply_filter(&mut rows, &simplified);
153
154 self.debug_log(format!(
155 "🔎 Applied filter -> {} entities remaining",
156 rows.len()
157 ));
158 }
159
160 if let Some(sort) = &query.sort
162 && rows.len() > 1
163 {
164 Self::apply_sort(&mut rows, sort);
165 self.debug_log("↕️ Applied sort expression");
166 }
167
168 if let Some(lim) = &query.limit
170 && !pre_paginated
171 {
172 apply_pagination(&mut rows, lim.offset, lim.limit);
173 self.debug_log(format!(
174 "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
175 lim.offset,
176 lim.limit,
177 rows.len()
178 ));
179 }
180
181 crate::db::executor::set_rows_from_len(&mut span, rows.len());
182 self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
183
184 Ok(Response(rows))
185 }
186
187 #[allow(clippy::cast_possible_truncation)]
190 pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
191 let count = self.execute(query)?.count();
192
193 Ok(count)
194 }
195
196 fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
198 rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
199 }
200
201 fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
203 rows.sort_by(|(_, ea), (_, eb)| {
204 for (field, direction) in sort_expr.iter() {
205 let va = ea.get_value(field);
206 let vb = eb.get_value(field);
207
208 let ordering = match (va, vb) {
210 (None, None) => continue, (None, Some(_)) => Ordering::Less, (Some(_), None) => Ordering::Greater, (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
214 Some(ord) => ord,
215 None => continue, },
217 };
218
219 let ordering = match direction {
221 Order::Asc => ordering,
222 Order::Desc => ordering.reverse(),
223 };
224
225 if ordering != Ordering::Equal {
226 return ordering;
227 }
228 }
229
230 Ordering::Equal
232 });
233 }
234}
235
236fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
238 let total = rows.len();
239 let start = usize::min(offset as usize, total);
240 let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
241
242 if start >= end {
243 rows.clear();
244 } else {
245 rows.drain(..start);
246 rows.truncate(end - start);
247 }
248}
249
250#[cfg(test)]
255mod tests {
256 use super::{LoadExecutor, apply_pagination};
257 use crate::{
258 IndexSpec, Key, Value,
259 db::primitives::{Order, SortExpr},
260 traits::{
261 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
262 ValidateAuto, ValidateCustom, View, Visitable,
263 },
264 };
265 use serde::{Deserialize, Serialize};
266
267 #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
268 struct SortableEntity {
269 id: u64,
270 primary: i32,
271 secondary: i32,
272 optional_blob: Option<Vec<u8>>,
273 }
274
275 impl SortableEntity {
276 fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
277 Self {
278 id,
279 primary,
280 secondary,
281 optional_blob,
282 }
283 }
284 }
285
286 struct SortableCanister;
287 struct SortableStore;
288
289 impl Path for SortableCanister {
290 const PATH: &'static str = "test::canister";
291 }
292
293 impl CanisterKind for SortableCanister {}
294
295 impl Path for SortableStore {
296 const PATH: &'static str = "test::store";
297 }
298
299 impl StoreKind for SortableStore {
300 type Canister = SortableCanister;
301 }
302
303 impl Path for SortableEntity {
304 const PATH: &'static str = "test::sortable";
305 }
306
307 impl View for SortableEntity {
308 type ViewType = Self;
309
310 fn to_view(&self) -> Self::ViewType {
311 self.clone()
312 }
313
314 fn from_view(view: Self::ViewType) -> Self {
315 view
316 }
317 }
318
319 impl SanitizeAuto for SortableEntity {}
320 impl SanitizeCustom for SortableEntity {}
321 impl ValidateAuto for SortableEntity {}
322 impl ValidateCustom for SortableEntity {}
323 impl Visitable for SortableEntity {}
324
325 impl FieldValues for SortableEntity {
326 fn get_value(&self, field: &str) -> Option<Value> {
327 match field {
328 "id" => Some(Value::Uint(self.id)),
329 "primary" => Some(Value::Int(i64::from(self.primary))),
330 "secondary" => Some(Value::Int(i64::from(self.secondary))),
331 "optional_blob" => self.optional_blob.clone().map(Value::Blob),
332 _ => None,
333 }
334 }
335 }
336
337 impl EntityKind for SortableEntity {
338 type PrimaryKey = u64;
339 type Store = SortableStore;
340 type Canister = SortableCanister;
341
342 const ENTITY_ID: u64 = 99;
343 const PRIMARY_KEY: &'static str = "id";
344 const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
345 const INDEXES: &'static [&'static IndexSpec] = &[];
346
347 fn key(&self) -> Key {
348 self.id.into()
349 }
350
351 fn primary_key(&self) -> Self::PrimaryKey {
352 self.id
353 }
354 }
355
356 #[test]
357 fn pagination_empty_vec() {
358 let mut v: Vec<i32> = vec![];
359 apply_pagination(&mut v, 0, Some(10));
360 assert!(v.is_empty());
361 }
362
363 #[test]
364 fn pagination_offset_beyond_len_clears() {
365 let mut v = vec![1, 2, 3];
366 apply_pagination(&mut v, 10, Some(5));
367 assert!(v.is_empty());
368 }
369
370 #[test]
371 fn pagination_no_limit_from_offset() {
372 let mut v = vec![1, 2, 3, 4, 5];
373 apply_pagination(&mut v, 2, None);
374 assert_eq!(v, vec![3, 4, 5]);
375 }
376
377 #[test]
378 fn pagination_exact_window() {
379 let mut v = vec![10, 20, 30, 40, 50];
380 apply_pagination(&mut v, 1, Some(3));
382 assert_eq!(v, vec![20, 30, 40]);
383 }
384
385 #[test]
386 fn pagination_limit_exceeds_tail() {
387 let mut v = vec![10, 20, 30];
388 apply_pagination(&mut v, 1, Some(999));
390 assert_eq!(v, vec![20, 30]);
391 }
392
393 #[test]
394 fn apply_sort_orders_descending() {
395 let mut rows = vec![
396 (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
397 (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
398 (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
399 ];
400 let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
401
402 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
403
404 let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
405 assert_eq!(primary, vec![30, 20, 10]);
406 }
407
408 #[test]
409 fn apply_sort_uses_secondary_field_for_ties() {
410 let mut rows = vec![
411 (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
412 (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
413 (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
414 ];
415 let sort_expr = SortExpr::from(vec![
416 ("primary".to_string(), Order::Asc),
417 ("secondary".to_string(), Order::Desc),
418 ]);
419
420 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
421
422 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
423 assert_eq!(ids, vec![2, 1, 3]);
424 }
425
426 #[test]
427 fn apply_sort_places_none_before_some_and_falls_back() {
428 let mut rows = vec![
429 (
430 Key::from(3_u64),
431 SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
432 ),
433 (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
434 (
435 Key::from(2_u64),
436 SortableEntity::new(2, 0, 0, Some(vec![2])),
437 ),
438 ];
439 let sort_expr = SortExpr::from(vec![
440 ("optional_blob".to_string(), Order::Asc),
441 ("id".to_string(), Order::Asc),
442 ]);
443
444 LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
445
446 let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
447 assert_eq!(ids, vec![1, 2, 3]);
448 }
449}