icydb_core/db/executor/
load.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{FilterEvaluator, plan_for},
6        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
7        query::{LoadQuery, QueryPlan, QueryValidate},
8        response::Response,
9    },
10    obs::metrics,
11    traits::{EntityKind, FieldValue},
12};
13use canic::log;
14use std::{cmp::Ordering, marker::PhantomData};
15
16///
17/// LoadExecutor
18///
19
20#[derive(Clone, Copy)]
21pub struct LoadExecutor<E: EntityKind> {
22    db: Db<E::Canister>,
23    debug: bool,
24    _marker: PhantomData<E>,
25}
26
27impl<E: EntityKind> LoadExecutor<E> {
28    #[must_use]
29    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
30        Self {
31            db,
32            debug,
33            _marker: PhantomData,
34        }
35    }
36
37    #[inline]
38    fn debug_log(&self, s: impl Into<String>) {
39        if self.debug {
40            log!(Debug, "{}", s.into());
41        }
42    }
43
44    ///
45    /// SHORTCUT METHODS
46    ///
47
48    /// Fetch a primary key for a single matching row.
49    pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
50        self.one(value)?.try_key()
51    }
52
53    /// Fetch a single entity by field value.
54    pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
55        self.one(value)?.try_entity()
56    }
57
58    ///
59    /// BUILDER METHODS
60    ///
61
62    /// Build and execute a query for a single matching row.
63    pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
64        let query = LoadQuery::new().one::<E>(value);
65        self.execute(query)
66    }
67
68    /// Build and execute a query for the unit primary key.
69    pub fn only(&self) -> Result<Response<E>, Error> {
70        let query = LoadQuery::new().one::<E>(());
71        self.execute(query)
72    }
73
74    /// Build and execute a query matching multiple primary keys.
75    pub fn many(
76        &self,
77        values: impl IntoIterator<Item = impl FieldValue>,
78    ) -> Result<Response<E>, Error> {
79        let query = LoadQuery::new().many::<E>(values);
80        self.execute(query)
81    }
82
83    /// Execute an unfiltered query for all rows.
84    pub fn all(&self) -> Result<Response<E>, Error> {
85        let query = LoadQuery::new();
86        self.execute(query)
87    }
88
89    /// Apply a filter builder and execute.
90    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
91    where
92        F: FnOnce(FilterDsl) -> I,
93        I: IntoFilterExpr,
94    {
95        let query = LoadQuery::new().filter(f);
96        self.execute(query)
97    }
98
99    /// Count all rows (executes the query plan).
100    pub fn count_all(self) -> Result<u32, Error> {
101        let query = LoadQuery::all();
102        self.count(query)
103    }
104
105    ///
106    /// EXECUTION METHODS
107    ///
108
109    // explain
110    /// Validate and return the query plan without executing.
111    pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
112        QueryValidate::<E>::validate(&query)?;
113
114        Ok(plan_for::<E>(query.filter.as_ref()))
115    }
116
117    /// Execute a full query and return a collection of entities.
118    pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
119        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
120        QueryValidate::<E>::validate(&query)?;
121
122        self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
123
124        let ctx = self.db.context::<E>();
125        let plan = plan_for::<E>(query.filter.as_ref());
126
127        self.debug_log(format!("📄 Query plan: {plan:?}"));
128
129        // Fast path: pre-pagination
130        let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
131        let data_rows = if pre_paginated {
132            let lim = query.limit.as_ref().unwrap();
133            ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?
134        } else {
135            ctx.rows_from_plan(plan)?
136        };
137
138        self.debug_log(format!(
139            "📦 Loaded {} data rows before deserialization",
140            data_rows.len()
141        ));
142
143        // Deserialize
144        let mut rows: Vec<(Key, E)> = ctx.deserialize_rows(data_rows)?;
145        self.debug_log(format!(
146            "🧩 Deserialized {} entities before filtering",
147            rows.len()
148        ));
149
150        // Filtering
151        if let Some(f) = &query.filter {
152            let simplified = f.clone().simplify();
153            Self::apply_filter(&mut rows, &simplified);
154
155            self.debug_log(format!(
156                "🔎 Applied filter -> {} entities remaining",
157                rows.len()
158            ));
159        }
160
161        // Sorting
162        if let Some(sort) = &query.sort
163            && rows.len() > 1
164        {
165            Self::apply_sort(&mut rows, sort);
166            self.debug_log("↕️ Applied sort expression");
167        }
168
169        // Pagination
170        if let Some(lim) = &query.limit
171            && !pre_paginated
172        {
173            apply_pagination(&mut rows, lim.offset, lim.limit);
174            self.debug_log(format!(
175                "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
176                lim.offset,
177                lim.limit,
178                rows.len()
179            ));
180        }
181
182        crate::db::executor::set_rows_from_len(&mut span, rows.len());
183        self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
184
185        Ok(Response(rows))
186    }
187
188    /// currently just doing the same as execute()
189    /// keeping it separate in case we can optimise count queries in the future
190    #[allow(clippy::cast_possible_truncation)]
191    pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
192        let count = self.execute(query)?.count();
193
194        Ok(count)
195    }
196
197    // apply_filter
198    fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
199        rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
200    }
201
202    // apply_sort
203    fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
204        rows.sort_by(|(_, ea), (_, eb)| {
205            for (field, direction) in sort_expr.iter() {
206                let va = ea.get_value(field);
207                let vb = eb.get_value(field);
208
209                // Define how to handle missing values (None)
210                let ordering = match (va, vb) {
211                    (None, None) => continue,             // both missing → move to next field
212                    (None, Some(_)) => Ordering::Less,    // None sorts before Some(_)
213                    (Some(_), None) => Ordering::Greater, // Some(_) sorts after None
214                    (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
215                        Some(ord) => ord,
216                        None => continue, // incomparable values → move to next field
217                    },
218                };
219
220                // Apply direction (Asc/Desc)
221                let ordering = match direction {
222                    Order::Asc => ordering,
223                    Order::Desc => ordering.reverse(),
224                };
225
226                if ordering != Ordering::Equal {
227                    return ordering;
228                }
229            }
230
231            // all fields equal
232            Ordering::Equal
233        });
234    }
235}
236
237/// Apply offset/limit pagination to an in-memory vector, in-place.
238fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
239    let total = rows.len();
240    let start = usize::min(offset as usize, total);
241    let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
242
243    if start >= end {
244        rows.clear();
245    } else {
246        rows.drain(..start);
247        rows.truncate(end - start);
248    }
249}
250
251///
252/// TESTS
253///
254
255#[cfg(test)]
256mod tests {
257    use super::{LoadExecutor, apply_pagination};
258    use crate::{
259        IndexSpec, Key, Value,
260        db::primitives::{Order, SortExpr},
261        traits::{
262            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
263            ValidateAuto, ValidateCustom, View, Visitable,
264        },
265    };
266    use serde::{Deserialize, Serialize};
267
268    #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
269    struct SortableEntity {
270        id: u64,
271        primary: i32,
272        secondary: i32,
273        optional_blob: Option<Vec<u8>>,
274    }
275
276    impl SortableEntity {
277        fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
278            Self {
279                id,
280                primary,
281                secondary,
282                optional_blob,
283            }
284        }
285    }
286
287    struct SortableCanister;
288    struct SortableStore;
289
290    impl Path for SortableCanister {
291        const PATH: &'static str = "test::canister";
292    }
293
294    impl CanisterKind for SortableCanister {}
295
296    impl Path for SortableStore {
297        const PATH: &'static str = "test::store";
298    }
299
300    impl StoreKind for SortableStore {
301        type Canister = SortableCanister;
302    }
303
304    impl Path for SortableEntity {
305        const PATH: &'static str = "test::sortable";
306    }
307
308    impl View for SortableEntity {
309        type ViewType = Self;
310
311        fn to_view(&self) -> Self::ViewType {
312            self.clone()
313        }
314
315        fn from_view(view: Self::ViewType) -> Self {
316            view
317        }
318    }
319
320    impl SanitizeAuto for SortableEntity {}
321    impl SanitizeCustom for SortableEntity {}
322    impl ValidateAuto for SortableEntity {}
323    impl ValidateCustom for SortableEntity {}
324    impl Visitable for SortableEntity {}
325
326    impl FieldValues for SortableEntity {
327        fn get_value(&self, field: &str) -> Option<Value> {
328            match field {
329                "id" => Some(Value::Uint(self.id)),
330                "primary" => Some(Value::Int(i64::from(self.primary))),
331                "secondary" => Some(Value::Int(i64::from(self.secondary))),
332                "optional_blob" => self.optional_blob.clone().map(Value::Blob),
333                _ => None,
334            }
335        }
336    }
337
338    impl EntityKind for SortableEntity {
339        type PrimaryKey = u64;
340        type Store = SortableStore;
341        type Canister = SortableCanister;
342
343        const ENTITY_ID: u64 = 99;
344        const PRIMARY_KEY: &'static str = "id";
345        const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
346        const INDEXES: &'static [&'static IndexSpec] = &[];
347
348        fn key(&self) -> Key {
349            self.id.into()
350        }
351
352        fn primary_key(&self) -> Self::PrimaryKey {
353            self.id
354        }
355    }
356
357    #[test]
358    fn pagination_empty_vec() {
359        let mut v: Vec<i32> = vec![];
360        apply_pagination(&mut v, 0, Some(10));
361        assert!(v.is_empty());
362    }
363
364    #[test]
365    fn pagination_offset_beyond_len_clears() {
366        let mut v = vec![1, 2, 3];
367        apply_pagination(&mut v, 10, Some(5));
368        assert!(v.is_empty());
369    }
370
371    #[test]
372    fn pagination_no_limit_from_offset() {
373        let mut v = vec![1, 2, 3, 4, 5];
374        apply_pagination(&mut v, 2, None);
375        assert_eq!(v, vec![3, 4, 5]);
376    }
377
378    #[test]
379    fn pagination_exact_window() {
380        let mut v = vec![10, 20, 30, 40, 50];
381        // offset 1, limit 3 -> elements [20,30,40]
382        apply_pagination(&mut v, 1, Some(3));
383        assert_eq!(v, vec![20, 30, 40]);
384    }
385
386    #[test]
387    fn pagination_limit_exceeds_tail() {
388        let mut v = vec![10, 20, 30];
389        // offset 1, limit large -> [20,30]
390        apply_pagination(&mut v, 1, Some(999));
391        assert_eq!(v, vec![20, 30]);
392    }
393
394    #[test]
395    fn apply_sort_orders_descending() {
396        let mut rows = vec![
397            (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
398            (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
399            (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
400        ];
401        let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
402
403        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
404
405        let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
406        assert_eq!(primary, vec![30, 20, 10]);
407    }
408
409    #[test]
410    fn apply_sort_uses_secondary_field_for_ties() {
411        let mut rows = vec![
412            (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
413            (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
414            (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
415        ];
416        let sort_expr = SortExpr::from(vec![
417            ("primary".to_string(), Order::Asc),
418            ("secondary".to_string(), Order::Desc),
419        ]);
420
421        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
422
423        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
424        assert_eq!(ids, vec![2, 1, 3]);
425    }
426
427    #[test]
428    fn apply_sort_places_none_before_some_and_falls_back() {
429        let mut rows = vec![
430            (
431                Key::from(3_u64),
432                SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
433            ),
434            (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
435            (
436                Key::from(2_u64),
437                SortableEntity::new(2, 0, 0, Some(vec![2])),
438            ),
439        ];
440        let sort_expr = SortExpr::from(vec![
441            ("optional_blob".to_string(), Order::Asc),
442            ("id".to_string(), Order::Asc),
443        ]);
444
445        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
446
447        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
448        assert_eq!(ids, vec![1, 2, 3]);
449    }
450}