icydb_core/db/executor/
context.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        query::QueryPlan,
6        store::{DataKey, DataRow, DataStore},
7    },
8    deserialize,
9    traits::{EntityKind, Path},
10};
11use std::{marker::PhantomData, ops::Bound};
12
13///
14/// Context
15///
16
17pub struct Context<'a, E: EntityKind> {
18    pub db: &'a Db<E::Canister>,
19    _marker: PhantomData<E>,
20}
21
22impl<'a, E> Context<'a, E>
23where
24    E: EntityKind,
25{
26    #[must_use]
27    pub const fn new(db: &'a Db<E::Canister>) -> Self {
28        Self {
29            db,
30            _marker: PhantomData,
31        }
32    }
33
34    pub fn with_store<R>(&self, f: impl FnOnce(&DataStore) -> R) -> Result<R, Error> {
35        self.db.with_data(|reg| reg.with_store(E::Store::PATH, f))
36    }
37
38    pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut DataStore) -> R) -> Result<R, Error> {
39        self.db
40            .with_data(|reg| reg.with_store_mut(E::Store::PATH, f))
41    }
42
43    ///
44    /// Analyze Plan
45    ///
46
47    pub fn candidates_from_plan(&self, plan: QueryPlan) -> Result<Vec<DataKey>, Error> {
48        let candidates = match plan {
49            QueryPlan::Keys(keys) => Self::to_data_keys(keys),
50
51            QueryPlan::Range(start, end) => self.with_store(|s| {
52                let start = Self::to_data_key(start);
53                let end = Self::to_data_key(end);
54
55                s.range((Bound::Included(start), Bound::Included(end)))
56                    .map(|e| e.key().clone())
57                    .collect()
58            })?,
59
60            QueryPlan::FullScan => self.with_store(|s| {
61                let start = DataKey::lower_bound::<E>();
62                let end = DataKey::upper_bound::<E>();
63
64                s.range((Bound::Included(start), Bound::Included(end)))
65                    .map(|entry| entry.key().clone())
66                    .collect()
67            })?,
68
69            QueryPlan::Index(index_plan) => {
70                let index_store = self
71                    .db
72                    .with_index(|reg| reg.try_get_store(index_plan.index.store))?;
73
74                index_store.with_borrow(|istore| {
75                    istore.resolve_data_values::<E>(index_plan.index, &index_plan.values)
76                })
77            }
78        };
79
80        Ok(candidates)
81    }
82
83    pub fn rows_from_plan(&self, plan: QueryPlan) -> Result<Vec<DataRow>, Error> {
84        match plan {
85            QueryPlan::Keys(keys) => {
86                let data_keys = Self::to_data_keys(keys);
87                self.load_many(&data_keys)
88            }
89            QueryPlan::Range(start, end) => {
90                let start = Self::to_data_key(start);
91                let end = Self::to_data_key(end);
92                self.load_range(start, end)
93            }
94            QueryPlan::FullScan => self.with_store(|s| {
95                let start = DataKey::lower_bound::<E>();
96                let end = DataKey::upper_bound::<E>();
97
98                s.range((Bound::Included(start), Bound::Included(end)))
99                    .map(|entry| (entry.key().clone(), entry.value()))
100                    .collect()
101            }),
102            QueryPlan::Index(_) => {
103                let data_keys = self.candidates_from_plan(plan)?;
104                self.load_many(&data_keys)
105            }
106        }
107    }
108
109    /// Fetch rows with pagination applied as early as possible (pre-deserialization),
110    /// only when no additional filtering or sorting is required by the executor.
111    pub fn rows_from_plan_with_pagination(
112        &self,
113        plan: QueryPlan,
114        offset: u32,
115        limit: Option<u32>,
116    ) -> Result<Vec<DataRow>, Error> {
117        let skip = offset as usize;
118        let take = limit.map(|l| l as usize);
119
120        match plan {
121            QueryPlan::Keys(keys) => {
122                // Apply pagination to keys before loading
123                let mut keys = keys;
124                let total = keys.len();
125                let (start, end) = Self::slice_bounds(total, offset, limit);
126
127                if start >= end {
128                    return Ok(Vec::new());
129                }
130
131                let paged = keys.drain(start..end).collect::<Vec<_>>();
132                let data_keys = Self::to_data_keys(paged);
133
134                self.load_many(&data_keys)
135            }
136
137            QueryPlan::Range(start, end) => {
138                let start = Self::to_data_key(start);
139                let end = Self::to_data_key(end);
140
141                self.with_store(|s| {
142                    let base = s.range((Bound::Included(start), Bound::Included(end)));
143                    let cap = take.unwrap_or(0);
144                    let mut out = Vec::with_capacity(cap);
145                    match take {
146                        Some(t) => {
147                            for entry in base.skip(skip).take(t) {
148                                out.push((entry.key().clone(), entry.value()));
149                            }
150                        }
151                        None => {
152                            for entry in base.skip(skip) {
153                                out.push((entry.key().clone(), entry.value()));
154                            }
155                        }
156                    }
157                    out
158                })
159            }
160
161            QueryPlan::FullScan => self.with_store(|s| {
162                let start = DataKey::lower_bound::<E>();
163                let end = DataKey::upper_bound::<E>();
164
165                let base = s.range((Bound::Included(start), Bound::Included(end)));
166                let cap = take.unwrap_or(0);
167                let mut out = Vec::with_capacity(cap);
168                match take {
169                    Some(t) => {
170                        for entry in base.skip(skip).take(t) {
171                            out.push((entry.key().clone(), entry.value()));
172                        }
173                    }
174                    None => {
175                        for entry in base.skip(skip) {
176                            out.push((entry.key().clone(), entry.value()));
177                        }
178                    }
179                }
180                out
181            }),
182
183            QueryPlan::Index(_) => {
184                // Resolve candidate keys from index, then paginate before loading
185                let mut data_keys = self.candidates_from_plan(plan)?;
186                let total = data_keys.len();
187                let (start, end) = Self::slice_bounds(total, offset, limit);
188
189                if start >= end {
190                    return Ok(Vec::new());
191                }
192
193                let paged = data_keys.drain(start..end).collect::<Vec<_>>();
194
195                self.load_many(&paged)
196            }
197        }
198    }
199
200    ///
201    /// Load Helpers
202    ///
203
204    fn to_data_key(key: Key) -> DataKey {
205        DataKey::new::<E>(key)
206    }
207
208    fn to_data_keys(keys: Vec<Key>) -> Vec<DataKey> {
209        keys.into_iter().map(Self::to_data_key).collect()
210    }
211
212    #[inline]
213    fn slice_bounds(total: usize, offset: u32, limit: Option<u32>) -> (usize, usize) {
214        let start = (offset as usize).min(total);
215        let end = match limit {
216            Some(l) => start.saturating_add(l as usize).min(total),
217            None => total,
218        };
219
220        (start, end)
221    }
222
223    fn load_many(&self, keys: &[DataKey]) -> Result<Vec<DataRow>, Error> {
224        self.with_store(|s| {
225            keys.iter()
226                .filter_map(|k| s.get(k).map(|entry| (k.clone(), entry)))
227                .collect()
228        })
229    }
230
231    fn load_range(&self, start: DataKey, end: DataKey) -> Result<Vec<DataRow>, Error> {
232        self.with_store(|s| {
233            s.range((Bound::Included(start), Bound::Included(end)))
234                .map(|e| (e.key().clone(), e.value()))
235                .collect()
236        })
237    }
238
239    /// Deserialize raw data rows into typed entity rows, mapping `DataKey` → `(Key, E)`.
240    pub fn deserialize_rows(&self, rows: Vec<DataRow>) -> Result<Vec<(Key, E)>, Error> {
241        rows.into_iter()
242            .map(|(k, v)| deserialize::<E>(&v).map(|entry| (k.key(), entry)))
243            .collect()
244    }
245}