icydb_core/db/executor/
load.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr, Order, SortExpr},
10        query::{LoadQuery, QueryPlan, QueryValidate},
11        response::{Response, ResponseError},
12        store::DataRow,
13    },
14    obs::metrics,
15    traits::{EntityKind, FieldValue},
16};
17use std::{cmp::Ordering, collections::HashMap, hash::Hash, marker::PhantomData, ops::ControlFlow};
18
19///
20/// LoadExecutor
21///
22
23#[derive(Clone, Copy)]
24pub struct LoadExecutor<E: EntityKind> {
25    db: Db<E::Canister>,
26    debug: bool,
27    _marker: PhantomData<E>,
28}
29
30impl<E: EntityKind> LoadExecutor<E> {
31    // ======================================================================
32    // Construction & diagnostics
33    // ======================================================================
34
35    #[must_use]
36    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
37        Self {
38            db,
39            debug,
40            _marker: PhantomData,
41        }
42    }
43
44    fn debug_log(&self, s: impl Into<String>) {
45        if self.debug {
46            println!("{}", s.into());
47        }
48    }
49
50    // ======================================================================
51    // Query builders (execute and return Response)
52    // ======================================================================
53
54    /// Execute a query for a single primary key.
55    pub fn one(&self, value: impl FieldValue) -> Result<Response<E>, Error> {
56        self.execute(LoadQuery::new().one::<E>(value))
57    }
58
59    /// Execute a query for the unit primary key.
60    pub fn only(&self) -> Result<Response<E>, Error> {
61        self.execute(LoadQuery::new().one::<E>(()))
62    }
63
64    /// Execute a query matching multiple primary keys.
65    pub fn many<I, V>(&self, values: I) -> Result<Response<E>, Error>
66    where
67        I: IntoIterator<Item = V>,
68        V: FieldValue,
69    {
70        let query = LoadQuery::new().many_by_field(E::PRIMARY_KEY, values);
71        self.execute(query)
72    }
73
74    /// Execute an unfiltered query for all rows.
75    pub fn all(&self) -> Result<Response<E>, Error> {
76        self.execute(LoadQuery::new())
77    }
78
79    /// Execute a query built from a filter.
80    pub fn filter<F, I>(&self, f: F) -> Result<Response<E>, Error>
81    where
82        F: FnOnce(FilterDsl) -> I,
83        I: IntoFilterExpr,
84    {
85        self.execute(LoadQuery::new().filter(f))
86    }
87
88    // ======================================================================
89    // Cardinality guards (delegated to Response)
90    // ======================================================================
91
92    /// Execute a query and require exactly one row.
93    pub fn require_one(&self, query: LoadQuery) -> Result<(), Error> {
94        self.execute(query)?.require_one()
95    }
96
97    /// Require exactly one row by primary key.
98    pub fn require_one_pk(&self, value: impl FieldValue) -> Result<(), Error> {
99        self.require_one(LoadQuery::new().one::<E>(value))
100    }
101
102    /// Require exactly one row from a filter.
103    pub fn require_one_filter<F, I>(&self, f: F) -> Result<(), Error>
104    where
105        F: FnOnce(FilterDsl) -> I,
106        I: IntoFilterExpr,
107    {
108        self.require_one(LoadQuery::new().filter(f))
109    }
110
111    // ======================================================================
112    // Existence checks (≥1 semantics, intentionally weaker)
113    // ======================================================================
114
115    /// Check whether at least one row matches the query.
116    pub fn exists(&self, query: LoadQuery) -> Result<bool, Error> {
117        let query = query.limit_1();
118        QueryValidate::<E>::validate(&query)?;
119
120        let plan = plan_for::<E>(query.filter.as_ref());
121        let filter = query.filter.map(FilterExpr::simplify);
122        let mut found = false;
123
124        scan_plan::<E, _>(&self.db, plan, |_, entity| {
125            let matches = filter
126                .as_ref()
127                .is_none_or(|f| FilterEvaluator::new(&entity).eval(f));
128
129            if matches {
130                found = true;
131                ControlFlow::Break(())
132            } else {
133                ControlFlow::Continue(())
134            }
135        })?;
136
137        Ok(found)
138    }
139
140    /// Check existence by primary key.
141    pub fn exists_one(&self, value: impl FieldValue) -> Result<bool, Error> {
142        self.exists(LoadQuery::new().one::<E>(value))
143    }
144
145    /// Check existence with a filter.
146    pub fn exists_filter<F, I>(&self, f: F) -> Result<bool, Error>
147    where
148        F: FnOnce(FilterDsl) -> I,
149        I: IntoFilterExpr,
150    {
151        self.exists(LoadQuery::new().filter(f))
152    }
153
154    /// Check whether the table contains any rows.
155    pub fn exists_any(&self) -> Result<bool, Error> {
156        self.exists(LoadQuery::new())
157    }
158
159    // ======================================================================
160    // Existence checks with not-found errors (fast path, no deserialization)
161    // ======================================================================
162
163    /// Require at least one row by primary key.
164    pub fn ensure_exists_one(&self, value: impl FieldValue) -> Result<(), Error> {
165        if self.exists_one(value)? {
166            Ok(())
167        } else {
168            Err(ResponseError::NotFound { entity: E::PATH }.into())
169        }
170    }
171
172    /// Require that all provided primary keys exist.
173    #[allow(clippy::cast_possible_truncation)]
174    pub fn ensure_exists_many<I, V>(&self, values: I) -> Result<(), Error>
175    where
176        I: IntoIterator<Item = V>,
177        V: FieldValue,
178    {
179        let pks: Vec<_> = values.into_iter().collect();
180
181        let expected = pks.len() as u32;
182        if expected == 0 {
183            return Ok(());
184        }
185
186        let res = self.many(pks)?;
187        res.require_len(expected)?;
188
189        Ok(())
190    }
191
192    /// Require at least one row from a filter.
193    pub fn ensure_exists_filter<F, I>(&self, f: F) -> Result<(), Error>
194    where
195        F: FnOnce(FilterDsl) -> I,
196        I: IntoFilterExpr,
197    {
198        if self.exists_filter(f)? {
199            Ok(())
200        } else {
201            Err(ResponseError::NotFound { entity: E::PATH }.into())
202        }
203    }
204
205    // ======================================================================
206    // Execution & planning
207    // ======================================================================
208
209    /// Validate and return the query plan without executing.
210    pub fn explain(self, query: LoadQuery) -> Result<QueryPlan, Error> {
211        QueryValidate::<E>::validate(&query)?;
212
213        Ok(plan_for::<E>(query.filter.as_ref()))
214    }
215
216    fn execute_raw(&self, query: &LoadQuery) -> Result<Vec<DataRow>, Error> {
217        QueryValidate::<E>::validate(query)?;
218
219        let ctx = self.db.context::<E>();
220        let plan = plan_for::<E>(query.filter.as_ref());
221
222        if let Some(lim) = &query.limit {
223            Ok(ctx.rows_from_plan_with_pagination(plan, lim.offset, lim.limit)?)
224        } else {
225            Ok(ctx.rows_from_plan(plan)?)
226        }
227    }
228
229    /// Execute a full query and return a collection of entities.
230    ///
231    /// Note: index-backed loads are best-effort. If index entries point to missing
232    /// or malformed rows, those candidates are skipped. Use explicit strict APIs
233    /// when corruption must surface as an error.
234    pub fn execute(&self, query: LoadQuery) -> Result<Response<E>, Error> {
235        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Load);
236        QueryValidate::<E>::validate(&query)?;
237
238        self.debug_log(format!("🧭 Executing query: {:?} on {}", query, E::PATH));
239
240        let ctx = self.db.context::<E>();
241        let plan = plan_for::<E>(query.filter.as_ref());
242
243        self.debug_log(format!("📄 Query plan: {plan:?}"));
244
245        // Fast path: pre-pagination
246        let pre_paginated = query.filter.is_none() && query.sort.is_none() && query.limit.is_some();
247        let mut rows: Vec<(Key, E)> = if pre_paginated {
248            let data_rows = self.execute_raw(&query)?;
249
250            self.debug_log(format!(
251                "📦 Scanned {} data rows before deserialization",
252                data_rows.len()
253            ));
254
255            let rows = ctx.deserialize_rows(data_rows)?;
256            self.debug_log(format!(
257                "🧩 Deserialized {} entities before filtering",
258                rows.len()
259            ));
260            rows
261        } else {
262            let data_rows = ctx.rows_from_plan(plan)?;
263            self.debug_log(format!(
264                "📦 Scanned {} data rows before deserialization",
265                data_rows.len()
266            ));
267
268            let rows = ctx.deserialize_rows(data_rows)?;
269            self.debug_log(format!(
270                "🧩 Deserialized {} entities before filtering",
271                rows.len()
272            ));
273
274            rows
275        };
276
277        // Filtering
278        if let Some(f) = &query.filter {
279            let simplified = f.clone().simplify();
280            Self::apply_filter(&mut rows, &simplified);
281
282            self.debug_log(format!(
283                "🔎 Applied filter -> {} entities remaining",
284                rows.len()
285            ));
286        }
287
288        // Sorting
289        if let Some(sort) = &query.sort
290            && rows.len() > 1
291        {
292            Self::apply_sort(&mut rows, sort);
293            self.debug_log("↕️ Applied sort expression");
294        }
295
296        // Pagination
297        if let Some(lim) = &query.limit
298            && !pre_paginated
299        {
300            apply_pagination(&mut rows, lim.offset, lim.limit);
301            self.debug_log(format!(
302                "📏 Applied pagination (offset={}, limit={:?}) -> {} entities",
303                lim.offset,
304                lim.limit,
305                rows.len()
306            ));
307        }
308
309        set_rows_from_len(&mut span, rows.len());
310        self.debug_log(format!("✅ Query complete -> {} final rows", rows.len()));
311
312        Ok(Response(rows))
313    }
314
315    /// Count rows matching a query.
316    pub fn count(&self, query: LoadQuery) -> Result<u32, Error> {
317        Ok(self.execute(query)?.count())
318    }
319
320    pub fn count_all(&self) -> Result<u32, Error> {
321        self.count(LoadQuery::new())
322    }
323
324    // ======================================================================
325    // Aggregations
326    // ======================================================================
327
328    /// Group rows matching a query and count them by a derived key.
329    ///
330    /// This is intentionally implemented on the executor (not Response)
331    /// so it can later avoid full deserialization.
332    pub fn group_count_by<K, F>(
333        &self,
334        query: LoadQuery,
335        key_fn: F,
336    ) -> Result<HashMap<K, u32>, Error>
337    where
338        K: Eq + Hash,
339        F: Fn(&E) -> K,
340    {
341        let entities = self.execute(query)?.entities();
342
343        let mut counts = HashMap::new();
344        for e in entities {
345            *counts.entry(key_fn(&e)).or_insert(0) += 1;
346        }
347
348        Ok(counts)
349    }
350
351    // ======================================================================
352    // Private Helpers
353    // ======================================================================
354
355    // apply_filter
356    fn apply_filter(rows: &mut Vec<(Key, E)>, filter: &FilterExpr) {
357        rows.retain(|(_, e)| FilterEvaluator::new(e).eval(filter));
358    }
359
360    // apply_sort
361    fn apply_sort(rows: &mut [(Key, E)], sort_expr: &SortExpr) {
362        rows.sort_by(|(_, ea), (_, eb)| {
363            for (field, direction) in sort_expr.iter() {
364                let va = ea.get_value(field);
365                let vb = eb.get_value(field);
366
367                // Define how to handle missing values (None)
368                let ordering = match (va, vb) {
369                    (None, None) => continue,             // both missing → move to next field
370                    (None, Some(_)) => Ordering::Less,    // None sorts before Some(_)
371                    (Some(_), None) => Ordering::Greater, // Some(_) sorts after None
372                    (Some(va), Some(vb)) => match va.partial_cmp(&vb) {
373                        Some(ord) => ord,
374                        None => continue, // incomparable values → move to next field
375                    },
376                };
377
378                // Apply direction (Asc/Desc)
379                let ordering = match direction {
380                    Order::Asc => ordering,
381                    Order::Desc => ordering.reverse(),
382                };
383
384                if ordering != Ordering::Equal {
385                    return ordering;
386                }
387            }
388
389            // all fields equal
390            Ordering::Equal
391        });
392    }
393}
394
395/// Apply offset/limit pagination to an in-memory vector, in-place.
396fn apply_pagination<T>(rows: &mut Vec<T>, offset: u32, limit: Option<u32>) {
397    let total = rows.len();
398    let start = usize::min(offset as usize, total);
399    let end = limit.map_or(total, |l| usize::min(start + l as usize, total));
400
401    if start >= end {
402        rows.clear();
403    } else {
404        rows.drain(..start);
405        rows.truncate(end - start);
406    }
407}