icydb_core/db/executor/
load.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{FilterEvaluator, plan_for},
6        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7        query::{LoadQuery, QueryPlan, QueryValidate},
8        response::Response,
9    },
10    obs::metrics,
11    traits::{EntityKind, FieldValue},
12};
13use std::{cmp::Ordering, marker::PhantomData};
14
15///
16/// LoadExecutor
17///
18
19#[derive(Clone, Copy)]
20pub struct LoadExecutor<E: EntityKind> {
21    db: Db<E::Canister>,
22    debug: bool,
23    _marker: PhantomData<E>,
24}
25
26impl<E: EntityKind> LoadExecutor<E> {
27    #[must_use]
28    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
29        Self {
30            db,
31            debug,
32            _marker: PhantomData,
33        }
34    }
35
36    #[inline]
37    fn debug_log(&self, s: impl Into<String>) {
38        if self.debug {
39            println!("{}", s.into());
40        }
41    }
42
43    ///
44    /// SHORTCUT METHODS
45    ///
46
47    /// Fetch a primary key for a single matching row.
48    pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
49        self.one(value)?.try_key()
50    }
51
52    /// Fetch a single entity by field value.
53    pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
54        self.one(value)?.try_entity()
55    }
56
57    ///
58    /// BUILDER METHODS
59    ///
60
61    /// Build and execute a query for a single matching row.
62    pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
63        let query = LoadQuery::new().one::<E>(value);
64        self.execute(query)
65    }
66
67    /// Build and execute a query for the unit primary key.
68    pub fn only(&self) -> Result<Response<E>, Error> {
69        let query = LoadQuery::new().one::<E>(());
70        self.execute(query)
71    }
72
73    /// Build and execute a query matching multiple primary keys.
74    pub fn many(
75        &self,
76        values: impl IntoIterator<Item = impl FieldValue>,
77    ) -> Result<Response<E>, Error> {
78        let query = LoadQuery::new().many::<E>(values);
79        self.execute(query)
80    }
81
82    /// Execute an unfiltered query for all rows.
83    pub fn all(&self) -> Result<Response<E>, Error> {
84        let query = LoadQuery::new();
85        self.execute(query)
86    }
87
88    /// Apply a filter builder and execute.
89    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
90    where
91        F: FnOnce(FilterDsl) -> I,
92        I: IntoFilterExpr,
93    {
94        let query = LoadQuery::new().filter(f);
95        self.execute(query)
96    }
97
98    /// Count all rows (executes the query plan).
99    pub fn count_all(self) -> Result<u32, Error> {
100        let query = LoadQuery::all();
101        self.count(query)
102    }
103
104    ///
105    /// EXECUTION METHODS
106    ///
107
108    // explain
109    /// Validate and return the query plan without executing.
110    pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
111        QueryValidate::<E>::validate(&query)?;
112
113        Ok(plan_for::<E>(query.filter.as_ref()))
114    }
115
116    /// Execute a full query and return a collection of entities.
117    pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
118        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
119        QueryValidate::<E>::validate(&query)?;
120
121        self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
122
123        let ctx = self.db.context::<E>();
124        let plan = plan_for::<E>(query.filter.as_ref());
125
126        self.debug_log(format!("📄 Query plan: {plan:?}"));
127
128        // Fast path: pre-pagination
129        let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
130        let data_rows = if pre_paginated {
131            let lim = query.limit.as_ref().unwrap();
132            ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
133        } else {
134            ctx.rows_from_plan(plan)?
135        };
136
137        self.debug_log(format!(
138            "📦 Loaded {} data rows before deserialization",
139            data_rows.len()
140        ));
141
142        // Deserialize
143        let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
144        self.debug_log(format!(
145            "🧩 Deserialized {} entities before filtering",
146            rows.len()
147        ));
148
149        // Filtering
150        if let Some(f) = &query.filter {
151            let simplified = f.clone().simplify();
152            Self::apply_filter(&mut rows, &simplified);
153
154            self.debug_log(format!(
155                "🔎 Applied filter -> {} entities remaining",
156                rows.len()
157            ));
158        }
159
160        // Sorting
161        if let Some(sort) = &query.sort
162            && rows.len() > 1
163        {
164            Self::apply_sort(&mut rows, sort);
165            self.debug_log("↕️ Applied sort expression");
166        }
167
168        // Pagination
169        if let Some(lim) = &query.limit
170            && !pre_paginated
171        {
172            apply_pagination(&mut rows, lim.offset, lim.limit);
173            self.debug_log(format!(
174                "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
175                lim.offset,
176                lim.limit,
177                rows.len()
178            ));
179        }
180
181        crate::db::executor::set_rows_from_len(&mut span, rows.len());
182        self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
183
184        Ok(Response(rows))
185    }
186
187    /// currently just doing the same as execute()
188    /// keeping it separate in case we can optimise count queries in the future
189    #[allow(clippy::cast_possible_truncation)]
190    pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
191        let count = self.execute(query)?.count();
192
193        Ok(count)
194    }
195
196    // apply_filter
197    fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
198        rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
199    }
200
201    // apply_sort
202    fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
203        rows.sort_by(|(_, ea), (_, eb)| {
204            for (field, direction) in sort_expr.iter() {
205                let va = ea.get_value(field);
206                let vb = eb.get_value(field);
207
208                // Define how to handle missing values (None)
209                let ordering = match (va, vb) {
210                    (None, None) => continue,             // both missing → move to next field
211                    (None, Some(_)) => Ordering::Less,    // None sorts before Some(_)
212                    (Some(_), None) => Ordering::Greater, // Some(_) sorts after None
213                    (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
214                        Some(ord) => ord,
215                        None => continue, // incomparable values → move to next field
216                    },
217                };
218
219                // Apply direction (Asc/Desc)
220                let ordering = match direction {
221                    Order::Asc => ordering,
222                    Order::Desc => ordering.reverse(),
223                };
224
225                if ordering != Ordering::Equal {
226                    return ordering;
227                }
228            }
229
230            // all fields equal
231            Ordering::Equal
232        });
233    }
234}
235
236/// Apply offset/limit pagination to an in-memory vector, in-place.
237fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
238    let total = rows.len();
239    let start = usize::min(offset as usize, total);
240    let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
241
242    if start >= end {
243        rows.clear();
244    } else {
245        rows.drain(..start);
246        rows.truncate(end - start);
247    }
248}
249
250///
251/// TESTS
252///
253
254#[cfg(test)]
255mod tests {
256    use super::{LoadExecutor, apply_pagination};
257    use crate::{
258        IndexSpec, Key, Value,
259        db::primitives::{Order, SortExpr},
260        traits::{
261            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
262            ValidateAuto, ValidateCustom, View, Visitable,
263        },
264    };
265    use serde::{Deserialize, Serialize};
266
267    #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
268    struct SortableEntity {
269        id: u64,
270        primary: i32,
271        secondary: i32,
272        optional_blob: Option<Vec<u8>>,
273    }
274
275    impl SortableEntity {
276        fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
277            Self {
278                id,
279                primary,
280                secondary,
281                optional_blob,
282            }
283        }
284    }
285
286    struct SortableCanister;
287    struct SortableStore;
288
289    impl Path for SortableCanister {
290        const PATH: &'static str = "test::canister";
291    }
292
293    impl CanisterKind for SortableCanister {}
294
295    impl Path for SortableStore {
296        const PATH: &'static str = "test::store";
297    }
298
299    impl StoreKind for SortableStore {
300        type Canister = SortableCanister;
301    }
302
303    impl Path for SortableEntity {
304        const PATH: &'static str = "test::sortable";
305    }
306
307    impl View for SortableEntity {
308        type ViewType = Self;
309
310        fn to_view(&self) -> Self::ViewType {
311            self.clone()
312        }
313
314        fn from_view(view: Self::ViewType) -> Self {
315            view
316        }
317    }
318
319    impl SanitizeAuto for SortableEntity {}
320    impl SanitizeCustom for SortableEntity {}
321    impl ValidateAuto for SortableEntity {}
322    impl ValidateCustom for SortableEntity {}
323    impl Visitable for SortableEntity {}
324
325    impl FieldValues for SortableEntity {
326        fn get_value(&self, field: &str) -> Option<Value> {
327            match field {
328                "id" => Some(Value::Uint(self.id)),
329                "primary" => Some(Value::Int(i64::from(self.primary))),
330                "secondary" => Some(Value::Int(i64::from(self.secondary))),
331                "optional_blob" => self.optional_blob.clone().map(Value::Blob),
332                _ => None,
333            }
334        }
335    }
336
337    impl EntityKind for SortableEntity {
338        type PrimaryKey = u64;
339        type Store = SortableStore;
340        type Canister = SortableCanister;
341
342        const ENTITY_ID: u64 = 99;
343        const PRIMARY_KEY: &'static str = "id";
344        const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
345        const INDEXES: &'static [&'static IndexSpec] = &[];
346
347        fn key(&self) -> Key {
348            self.id.into()
349        }
350
351        fn primary_key(&self) -> Self::PrimaryKey {
352            self.id
353        }
354    }
355
356    #[test]
357    fn pagination_empty_vec() {
358        let mut v: Vec<i32> = vec![];
359        apply_pagination(&mut v, 0, Some(10));
360        assert!(v.is_empty());
361    }
362
363    #[test]
364    fn pagination_offset_beyond_len_clears() {
365        let mut v = vec![1, 2, 3];
366        apply_pagination(&mut v, 10, Some(5));
367        assert!(v.is_empty());
368    }
369
370    #[test]
371    fn pagination_no_limit_from_offset() {
372        let mut v = vec![1, 2, 3, 4, 5];
373        apply_pagination(&mut v, 2, None);
374        assert_eq!(v, vec![3, 4, 5]);
375    }
376
377    #[test]
378    fn pagination_exact_window() {
379        let mut v = vec![10, 20, 30, 40, 50];
380        // offset 1, limit 3 -> elements [20,30,40]
381        apply_pagination(&mut v, 1, Some(3));
382        assert_eq!(v, vec![20, 30, 40]);
383    }
384
385    #[test]
386    fn pagination_limit_exceeds_tail() {
387        let mut v = vec![10, 20, 30];
388        // offset 1, limit large -> [20,30]
389        apply_pagination(&mut v, 1, Some(999));
390        assert_eq!(v, vec![20, 30]);
391    }
392
393    #[test]
394    fn apply_sort_orders_descending() {
395        let mut rows = vec![
396            (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
397            (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
398            (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
399        ];
400        let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
401
402        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
403
404        let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
405        assert_eq!(primary, vec![30, 20, 10]);
406    }
407
408    #[test]
409    fn apply_sort_uses_secondary_field_for_ties() {
410        let mut rows = vec![
411            (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
412            (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
413            (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
414        ];
415        let sort_expr = SortExpr::from(vec![
416            ("primary".to_string(), Order::Asc),
417            ("secondary".to_string(), Order::Desc),
418        ]);
419
420        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
421
422        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
423        assert_eq!(ids, vec![2, 1, 3]);
424    }
425
426    #[test]
427    fn apply_sort_places_none_before_some_and_falls_back() {
428        let mut rows = vec![
429            (
430                Key::from(3_u64),
431                SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
432            ),
433            (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
434            (
435                Key::from(2_u64),
436                SortableEntity::new(2, 0, 0, Some(vec![2])),
437            ),
438        ];
439        let sort_expr = SortExpr::from(vec![
440            ("optional_blob".to_string(), Order::Asc),
441            ("id".to_string(), Order::Asc),
442        ]);
443
444        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
445
446        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
447        assert_eq!(ids, vec![1, 2, 3]);
448    }
449}