icydb_core/db/executor/
load.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
10        query::{LoadQuery, QueryPlan, QueryValidate},
11        response::Response,
12    },
13    obs::metrics,
14    traits::{EntityKind, FieldValue},
15};
16use std::{cmp::Ordering, marker::PhantomData};
17
18///
19/// LoadExecutor
20///
21
22#[derive(Clone, Copy)]
23pub struct LoadExecutor<E: EntityKind> {
24    db: Db<E::Canister>,
25    debug: bool,
26    _marker: PhantomData<E>,
27}
28
29impl<E: EntityKind> LoadExecutor<E> {
30    #[must_use]
31    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
32        Self {
33            db,
34            debug,
35            _marker: PhantomData,
36        }
37    }
38
39    #[inline]
40    fn debug_log(&self, s: impl Into<String>) {
41        if self.debug {
42            println!("{}", s.into());
43        }
44    }
45
46    ///
47    /// SHORTCUT METHODS
48    ///
49
50    /// Fetch a primary key for a single matching row.
51    pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
52        self.one(value)?.try_key()
53    }
54
55    /// Fetch a single entity by field value.
56    pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
57        self.one(value)?.try_entity()
58    }
59
60    ///
61    /// BUILDER METHODS
62    ///
63
64    /// Build and execute a query for a single matching row.
65    pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
66        let query = LoadQuery::new().one::<E>(value);
67        self.execute(query)
68    }
69
70    /// Build and execute a query for the unit primary key.
71    pub fn only(&self) -> Result<Response<E>, Error> {
72        let query = LoadQuery::new().one::<E>(());
73        self.execute(query)
74    }
75
76    /// Build and execute a query matching multiple primary keys.
77    pub fn many(
78        &self,
79        values: impl IntoIterator<Item = impl FieldValue>,
80    ) -> Result<Response<E>, Error> {
81        let query = LoadQuery::new().many::<E>(values);
82        self.execute(query)
83    }
84
85    /// Execute an unfiltered query for all rows.
86    pub fn all(&self) -> Result<Response<E>, Error> {
87        let query = LoadQuery::new();
88        self.execute(query)
89    }
90
91    /// Apply a filter builder and execute.
92    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
93    where
94        F: FnOnce(FilterDsl) -> I,
95        I: IntoFilterExpr,
96    {
97        let query = LoadQuery::new().filter(f);
98        self.execute(query)
99    }
100
101    /// Count all rows (executes the query plan).
102    pub fn count_all(self) -> Result<u32, Error> {
103        let query = LoadQuery::all();
104        self.count(query)
105    }
106
107    ///
108    /// EXECUTION METHODS
109    ///
110
111    // explain
112    /// Validate and return the query plan without executing.
113    pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
114        QueryValidate::<E>::validate(&query)?;
115
116        Ok(plan_for::<E>(query.filter.as_ref()))
117    }
118
119    /// Execute a full query and return a collection of entities.
120    pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
121        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
122        QueryValidate::<E>::validate(&query)?;
123
124        self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
125
126        let ctx = self.db.context::<E>();
127        let plan = plan_for::<E>(query.filter.as_ref());
128
129        self.debug_log(format!("📄 Query plan: {plan:?}"));
130
131        // Fast path: pre-pagination
132        let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
133        let mut rows: Vec<(Key, E)> = if pre_paginated {
134            let lim = query.limit.as_ref().unwrap();
135            let data_rows = ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?;
136
137            self.debug_log(format!(
138                "📦 Scanned {} data rows before deserialization",
139                data_rows.len()
140            ));
141
142            let rows = ctx.deserialize_rows(data_rows)?;
143            self.debug_log(format!(
144                "🧩 Deserialized {} entities before filtering",
145                rows.len()
146            ));
147            rows
148        } else {
149            let data_rows = ctx.rows_from_plan(plan)?;
150            self.debug_log(format!(
151                "📦 Scanned {} data rows before deserialization",
152                data_rows.len()
153            ));
154
155            let rows = ctx.deserialize_rows(data_rows)?;
156            self.debug_log(format!(
157                "🧩 Deserialized {} entities before filtering",
158                rows.len()
159            ));
160
161            rows
162        };
163
164        // Filtering
165        if let Some(f) = &query.filter {
166            let simplified = f.clone().simplify();
167            Self::apply_filter(&mut rows, &simplified);
168
169            self.debug_log(format!(
170                "🔎 Applied filter -> {} entities remaining",
171                rows.len()
172            ));
173        }
174
175        // Sorting
176        if let Some(sort) = &query.sort
177            && rows.len() > 1
178        {
179            Self::apply_sort(&mut rows, sort);
180            self.debug_log("↕️ Applied sort expression");
181        }
182
183        // Pagination
184        if let Some(lim) = &query.limit
185            && !pre_paginated
186        {
187            apply_pagination(&mut rows, lim.offset, lim.limit);
188            self.debug_log(format!(
189                "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
190                lim.offset,
191                lim.limit,
192                rows.len()
193            ));
194        }
195
196        set_rows_from_len(&mut span, rows.len());
197        self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
198
199        Ok(Response(rows))
200    }
201
202    /// currently just doing the same as execute()
203    /// keeping it separate in case we can optimise count queries in the future
204    #[allow(clippy::cast_possible_truncation)]
205    pub fn count(self, query: LoadQuery) -> Result<u32, Error> {
206        let count = self.execute(query)?.count();
207
208        Ok(count)
209    }
210
211    // apply_filter
212    fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
213        rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
214    }
215
216    // apply_sort
217    fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
218        rows.sort_by(|(_, ea), (_, eb)| {
219            for (field, direction) in sort_expr.iter() {
220                let va = ea.get_value(field);
221                let vb = eb.get_value(field);
222
223                // Define how to handle missing values (None)
224                let ordering = match (va, vb) {
225                    (None, None) => continue,             // both missing → move to next field
226                    (None, Some(_)) => Ordering::Less,    // None sorts before Some(_)
227                    (Some(_), None) => Ordering::Greater, // Some(_) sorts after None
228                    (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
229                        Some(ord) => ord,
230                        None => continue, // incomparable values → move to next field
231                    },
232                };
233
234                // Apply direction (Asc/Desc)
235                let ordering = match direction {
236                    Order::Asc => ordering,
237                    Order::Desc => ordering.reverse(),
238                };
239
240                if ordering != Ordering::Equal {
241                    return ordering;
242                }
243            }
244
245            // all fields equal
246            Ordering::Equal
247        });
248    }
249}
250
251/// Apply offset/limit pagination to an in-memory vector, in-place.
252fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
253    let total = rows.len();
254    let start = usize::min(offset as usize, total);
255    let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
256
257    if start >= end {
258        rows.clear();
259    } else {
260        rows.drain(..start);
261        rows.truncate(end - start);
262    }
263}
264
265///
266/// TESTS
267///
268
269#[cfg(test)]
270mod tests {
271    use super::{LoadExecutor, apply_pagination};
272    use crate::{
273        IndexSpec, Key, Value,
274        db::primitives::{Order, SortExpr},
275        traits::{
276            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
277            ValidateAuto, ValidateCustom, View, Visitable,
278        },
279    };
280    use serde::{Deserialize, Serialize};
281
282    #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
283    struct SortableEntity {
284        id: u64,
285        primary: i32,
286        secondary: i32,
287        optional_blob: Option<Vec<u8>>,
288    }
289
290    impl SortableEntity {
291        fn new(id: u64, primary: i32, secondary: i32, optional_blob: Option<Vec<u8>>) -> Self {
292            Self {
293                id,
294                primary,
295                secondary,
296                optional_blob,
297            }
298        }
299    }
300
301    struct SortableCanister;
302    struct SortableStore;
303
304    impl Path for SortableCanister {
305        const PATH: &'static str = "test::canister";
306    }
307
308    impl CanisterKind for SortableCanister {}
309
310    impl Path for SortableStore {
311        const PATH: &'static str = "test::store";
312    }
313
314    impl StoreKind for SortableStore {
315        type Canister = SortableCanister;
316    }
317
318    impl Path for SortableEntity {
319        const PATH: &'static str = "test::sortable";
320    }
321
322    impl View for SortableEntity {
323        type ViewType = Self;
324
325        fn to_view(&self) -> Self::ViewType {
326            self.clone()
327        }
328
329        fn from_view(view: Self::ViewType) -> Self {
330            view
331        }
332    }
333
334    impl SanitizeAuto for SortableEntity {}
335    impl SanitizeCustom for SortableEntity {}
336    impl ValidateAuto for SortableEntity {}
337    impl ValidateCustom for SortableEntity {}
338    impl Visitable for SortableEntity {}
339
340    impl FieldValues for SortableEntity {
341        fn get_value(&self, field: &str) -> Option<Value> {
342            match field {
343                "id" => Some(Value::Uint(self.id)),
344                "primary" => Some(Value::Int(i64::from(self.primary))),
345                "secondary" => Some(Value::Int(i64::from(self.secondary))),
346                "optional_blob" => self.optional_blob.clone().map(Value::Blob),
347                _ => None,
348            }
349        }
350    }
351
352    impl EntityKind for SortableEntity {
353        type PrimaryKey = u64;
354        type Store = SortableStore;
355        type Canister = SortableCanister;
356
357        const ENTITY_ID: u64 = 99;
358        const PRIMARY_KEY: &'static str = "id";
359        const FIELDS: &'static [&'static str] = &["id", "primary", "secondary", "optional_blob"];
360        const INDEXES: &'static [&'static IndexSpec] = &[];
361
362        fn key(&self) -> Key {
363            self.id.into()
364        }
365
366        fn primary_key(&self) -> Self::PrimaryKey {
367            self.id
368        }
369    }
370
371    #[test]
372    fn pagination_empty_vec() {
373        let mut v: Vec<i32> = vec![];
374        apply_pagination(&mut v, 0, Some(10));
375        assert!(v.is_empty());
376    }
377
378    #[test]
379    fn pagination_offset_beyond_len_clears() {
380        let mut v = vec![1, 2, 3];
381        apply_pagination(&mut v, 10, Some(5));
382        assert!(v.is_empty());
383    }
384
385    #[test]
386    fn pagination_no_limit_from_offset() {
387        let mut v = vec![1, 2, 3, 4, 5];
388        apply_pagination(&mut v, 2, None);
389        assert_eq!(v, vec![3, 4, 5]);
390    }
391
392    #[test]
393    fn pagination_exact_window() {
394        let mut v = vec![10, 20, 30, 40, 50];
395        // offset 1, limit 3 -> elements [20,30,40]
396        apply_pagination(&mut v, 1, Some(3));
397        assert_eq!(v, vec![20, 30, 40]);
398    }
399
400    #[test]
401    fn pagination_limit_exceeds_tail() {
402        let mut v = vec![10, 20, 30];
403        // offset 1, limit large -> [20,30]
404        apply_pagination(&mut v, 1, Some(999));
405        assert_eq!(v, vec![20, 30]);
406    }
407
408    #[test]
409    fn apply_sort_orders_descending() {
410        let mut rows = vec![
411            (Key::from(1_u64), SortableEntity::new(1, 10, 1, None)),
412            (Key::from(2_u64), SortableEntity::new(2, 30, 2, None)),
413            (Key::from(3_u64), SortableEntity::new(3, 20, 3, None)),
414        ];
415        let sort_expr = SortExpr::from(vec![("primary".to_string(), Order::Desc)]);
416
417        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
418
419        let primary: Vec<i32> = rows.iter().map(|(_, e)| e.primary).collect();
420        assert_eq!(primary, vec![30, 20, 10]);
421    }
422
423    #[test]
424    fn apply_sort_uses_secondary_field_for_ties() {
425        let mut rows = vec![
426            (Key::from(1_u64), SortableEntity::new(1, 1, 5, None)),
427            (Key::from(2_u64), SortableEntity::new(2, 1, 8, None)),
428            (Key::from(3_u64), SortableEntity::new(3, 2, 3, None)),
429        ];
430        let sort_expr = SortExpr::from(vec![
431            ("primary".to_string(), Order::Asc),
432            ("secondary".to_string(), Order::Desc),
433        ]);
434
435        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
436
437        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
438        assert_eq!(ids, vec![2, 1, 3]);
439    }
440
441    #[test]
442    fn apply_sort_places_none_before_some_and_falls_back() {
443        let mut rows = vec![
444            (
445                Key::from(3_u64),
446                SortableEntity::new(3, 0, 0, Some(vec![3, 4])),
447            ),
448            (Key::from(1_u64), SortableEntity::new(1, 0, 0, None)),
449            (
450                Key::from(2_u64),
451                SortableEntity::new(2, 0, 0, Some(vec![2])),
452            ),
453        ];
454        let sort_expr = SortExpr::from(vec![
455            ("optional_blob".to_string(), Order::Asc),
456            ("id".to_string(), Order::Asc),
457        ]);
458
459        LoadExecutor::<SortableEntity>::apply_sort(rows.as_mut_slice(), &sort_expr);
460
461        let ids: Vec<u64> = rows.iter().map(|(_, e)| e.id).collect();
462        assert_eq!(ids, vec![1, 2, 3]);
463    }
464}