icydb_core/db/executor/
load.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{FilterEvaluator, plan_for},
6        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7        query::{LoadQuery, QueryPlan, QueryValidate},
8        response::Response,
9    },
10    obs::metrics,
11    traits::{EntityKind, FieldValue},
12};
13use canic::log;
14use std::{cmp::Ordering, marker::PhantomData};
15
16///
17/// LoadExecutor
18///
19
20#[derive(Clone, Copy)]
21pub struct LoadExecutor<E: EntityKind> {
22    db: Db<E::Canister>,
23    debug: bool,
24    _marker: PhantomData<E>,
25}
26
27impl<E: EntityKind> LoadExecutor<E> {
28    #[must_use]
29    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
30        Self {
31            db,
32            debug,
33            _marker: PhantomData,
34        }
35    }
36
37    #[inline]
38    fn debug_log(&self, s: impl Into<String>) {
39        if self.debug {
40            log!(Debug, "{}", s.into());
41        }
42    }
43
44    ///
45    /// SHORTCUT METHODS
46    ///
47
48    pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
49        self.one(value)?.try_key()
50    }
51
52    pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
53        self.one(value)?.try_entity()
54    }
55
56    ///
57    /// BUILDER METHODS
58    ///
59
60    pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
61        let query = LoadQuery::new().one::<E>(value);
62        self.execute(query)
63    }
64
65    pub fn only(&self) -> Result<Response<E>, Error> {
66        let query = LoadQuery::new().one::<E>(());
67        self.execute(query)
68    }
69
70    pub fn many(
71        &self,
72        values: impl IntoIterator<Item = impl FieldValue>,
73    ) -> Result<Response<E>, Error> {
74        let query = LoadQuery::new().many::<E>(values);
75        self.execute(query)
76    }
77
78    pub fn all(&self) -> Result<Response<E>, Error> {
79        let query = LoadQuery::new();
80        self.execute(query)
81    }
82
83    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
84    where
85        F: FnOnce(FilterDsl) -> I,
86        I: IntoFilterExpr,
87    {
88        let query = LoadQuery::new().filter(f);
89        self.execute(query)
90    }
91
92    pub fn count_all(self) -> Result<u32, Error> {
93        let query = LoadQuery::all();
94        self.count(query)
95    }
96
97    ///
98    /// EXECUTION METHODS
99    ///
100
101    // explain
102    pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
103        QueryValidate::<E>::validate(&query)?;
104
105        Ok(plan_for::<E>(query.filter.as_ref()))
106    }
107
108    /// Execute a full query and return a collection of entities.
109    pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
110        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
111        QueryValidate::<E>::validate(&query)?;
112
113        self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
114
115        let ctx = self.db.context::<E>();
116        let plan = plan_for::<E>(query.filter.as_ref());
117
118        self.debug_log(format!("📄 Query plan: {plan:?}"));
119
120        // Fast path: pre-pagination
121        let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
122        let data_rows = if pre_paginated {
123            let lim = query.limit.as_ref().unwrap();
124            ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
125        } else {
126            ctx.rows_from_plan(plan)?
127        };
128
129        self.debug_log(format!(
130            "📦 Loaded {} data rows before deserialization",
131            data_rows.len()
132        ));
133
134        // Deserialize
135        let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
136        self.debug_log(format!(
137            "🧩 Deserialized {} entities before filtering",
138            rows.len()
139        ));
140
141        // Filtering
142        if let Some(f) = &query.filter {
143            let simplified = f.clone().simplify();
144            Self::apply_filter(&mut rows, &simplified);
145
146            self.debug_log(format!(
147                "🔎 Applied filter -> {} entities remaining",
148                rows.len()
149            ));
150        }
151
152        // Sorting
153        if let Some(sort) = &query.sort
154            && rows.len() > 1
155        {
156            Self::apply_sort(&mut rows, sort);
157            self.debug_log("↕️ Applied sort expression");
158        }
159
160        // Pagination
161        if let Some(lim) = &query.limit
162            && !pre_paginated
163        {
164            apply_pagination(&mut rows, lim.offset, lim.limit);
165            self.debug_log(format!(
166                "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
167                lim.offset,
168                lim.limit,
169                rows.len()
170            ));
171        }
172
173        crate::db::executor::set_rows_from_len(&mut span, rows.len());
174        self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
175
176        Ok(Response(rows))
177    }
178
179    /// currently just doing the same as execute()
180    /// keeping it separate in case we can optimise count queries in the future
181    #[allow(clippy::cast_possible_truncation)]
182    pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
183        let count = self.execute(query)?.count();
184
185        Ok(count)
186    }
187
188    // apply_filter
189    fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
190        rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
191    }
192
193    // apply_sort
194    fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
195        rows.sort_by(|(_, ea), (_, eb)| {
196            for (field, direction) in sort_expr.iter() {
197                let va = ea.get_value(field);
198                let vb = eb.get_value(field);
199
200                // Define how to handle missing values (None)
201                let ordering = match (va, vb) {
202                    (None, None) => continue,             // both missing → move to next field
203                    (None, Some(_)) => Ordering::Less,    // None sorts before Some(_)
204                    (Some(_), None) => Ordering::Greater, // Some(_) sorts after None
205                    (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
206                        Some(ord) => ord,
207                        None => continue, // incomparable values → move to next field
208                    },
209                };
210
211                // Apply direction (Asc/Desc)
212                let ordering = match direction {
213                    Order::Asc => ordering,
214                    Order::Desc => ordering.reverse(),
215                };
216
217                if ordering != Ordering::Equal {
218                    return ordering;
219                }
220            }
221
222            // all fields equal
223            Ordering::Equal
224        });
225    }
226}
227
228/// Apply offset/limit pagination to an in-memory vector, in-place.
229pub fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
230    let total = rows.len();
231    let start = usize::min(offset as usize, total);
232    let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
233
234    if start >= end {
235        rows.clear();
236    } else {
237        rows.drain(..start);
238        rows.truncate(end - start);
239    }
240}
241
242///
243/// TESTS
244///
245
246#[cfg(test)]
247mod tests {
248    use super::{LoadExecutor, apply_pagination};
249    use crate::{
250        IndexSpec, Key, Value,
251        db::primitives::{Order, SortExpr},
252        traits::{
253            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
254            ValidateAuto, ValidateCustom, View, Visitable,
255        },
256    };
257    use serde::{Deserialize, Serialize};
258
259    #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
260    struct SortableEntity {
261        id: u64,
262        primary: i32,
263        secondary: i32,
264        optional_blob: Option<Vec<u8>>,
265    }
266
267    impl SortableEntity {
268        fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
269            Self {
270                id,
271                primary,
272                secondary,
273                optional_blob,
274            }
275        }
276    }
277
278    struct SortableCanister;
279    struct SortableStore;
280
281    impl Path for SortableCanister {
282        const PATH: &'static str = "test::canister";
283    }
284
285    impl CanisterKind for SortableCanister {}
286
287    impl Path for SortableStore {
288        const PATH: &'static str = "test::store";
289    }
290
291    impl StoreKind for SortableStore {
292        type Canister = SortableCanister;
293    }
294
295    impl Path for SortableEntity {
296        const PATH: &'static str = "test::sortable";
297    }
298
299    impl View for SortableEntity {
300        type ViewType = Self;
301
302        fn to_view(&self) -> Self::ViewType {
303            self.clone()
304        }
305
306        fn from_view(view: Self::ViewType) -> Self {
307            view
308        }
309    }
310
311    impl SanitizeAuto for SortableEntity {}
312    impl SanitizeCustom for SortableEntity {}
313    impl ValidateAuto for SortableEntity {}
314    impl ValidateCustom for SortableEntity {}
315    impl Visitable for SortableEntity {}
316
317    impl FieldValues for SortableEntity {
318        fn get_value(&self, field: &str) -> Option<Value> {
319            match field {
320                "id" => Some(Value::Uint(self.id)),
321                "primary" => Some(Value::Int(i64::from(self.primary))),
322                "secondary" => Some(Value::Int(i64::from(self.secondary))),
323                "optional_blob" => self.optional_blob.clone().map(Value::Blob),
324                _ => None,
325            }
326        }
327    }
328
329    impl EntityKind for SortableEntity {
330        type PrimaryKey = u64;
331        type Store = SortableStore;
332        type Canister = SortableCanister;
333
334        const ENTITY_ID: u64 = 99;
335        const PRIMARY_KEY: &'static str = "id";
336        const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
337        const INDEXES: &'static [&'static IndexSpec] = &[];
338
339        fn key(&self) -> Key {
340            self.id.into()
341        }
342
343        fn primary_key(&self) -> Self::PrimaryKey {
344            self.id
345        }
346    }
347
348    #[test]
349    fn pagination_empty_vec() {
350        let mut v: Vec<i32> = vec![];
351        apply_pagination(&mut v, 0, Some(10));
352        assert!(v.is_empty());
353    }
354
355    #[test]
356    fn pagination_offset_beyond_len_clears() {
357        let mut v = vec![1, 2, 3];
358        apply_pagination(&mut v, 10, Some(5));
359        assert!(v.is_empty());
360    }
361
362    #[test]
363    fn pagination_no_limit_from_offset() {
364        let mut v = vec![1, 2, 3, 4, 5];
365        apply_pagination(&mut v, 2, None);
366        assert_eq!(v, vec![3, 4, 5]);
367    }
368
369    #[test]
370    fn pagination_exact_window() {
371        let mut v = vec![10, 20, 30, 40, 50];
372        // offset 1, limit 3 -> elements [20,30,40]
373        apply_pagination(&mut v, 1, Some(3));
374        assert_eq!(v, vec![20, 30, 40]);
375    }
376
377    #[test]
378    fn pagination_limit_exceeds_tail() {
379        let mut v = vec![10, 20, 30];
380        // offset 1, limit large -> [20,30]
381        apply_pagination(&mut v, 1, Some(999));
382        assert_eq!(v, vec![20, 30]);
383    }
384
385    #[test]
386    fn apply_sort_orders_descending() {
387        let mut rows = vec![
388            (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
389            (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
390            (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
391        ];
392        let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
393
394        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
395
396        let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
397        assert_eq!(primary, vec![30, 20, 10]);
398    }
399
400    #[test]
401    fn apply_sort_uses_secondary_field_for_ties() {
402        let mut rows = vec![
403            (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
404            (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
405            (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
406        ];
407        let sort_expr = SortExpr::from(vec![
408            ("primary".to_string(), Order::Asc),
409            ("secondary".to_string(), Order::Desc),
410        ]);
411
412        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
413
414        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
415        assert_eq!(ids, vec![2, 1, 3]);
416    }
417
418    #[test]
419    fn apply_sort_places_none_before_some_and_falls_back() {
420        let mut rows = vec![
421            (
422                Key::from(3_u64),
423                SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
424            ),
425            (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
426            (
427                Key::from(2_u64),
428                SortableEntity::new(2, 0, 0, Some(vec![2])),
429            ),
430        ];
431        let sort_expr = SortExpr::from(vec![
432            ("optional_blob".to_string(), Order::Asc),
433            ("id".to_string(), Order::Asc),
434        ]);
435
436        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
437
438        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
439        assert_eq!(ids, vec![1, 2, 3]);
440    }
441}