abstract_core/objects/
paged_map.rs

1use cosmwasm_std::{DepsMut, Order, StdError, StdResult, Storage};
2use cw_storage_plus::{Bound, Item, Map, Path};
3use serde::{de::DeserializeOwned, Deserialize, Serialize};
4
5use crate::{error::AbstractError, AbstractResult};
6
7const DEFAULT_LIMIT: u32 = 10;
8const MAX_LIMIT: u32 = 30;
9const MAX_MSG_LIMIT: u32 = 15;
10const PAGED_MAP: &str = "paged_map";
11
12pub type PaginationResult<Acum, PageResult> = AbstractResult<(Option<Acum>, PageResult)>;
13pub type PaginationAccumulatorFunction<T, Acum, C, FuncResult> =
14    fn(&[u8], &mut dyn Storage, T, &mut Acum, &C) -> AbstractResult<Option<FuncResult>>;
15pub type PaginationFunction<T, C, FuncResult> =
16    fn(&[u8], &mut dyn Storage, T, &C) -> AbstractResult<Option<FuncResult>>;
17/// Allows for multi-transaction computation on a dataset. Required for large datasets due to gas constraints.
18pub struct PagedMap<'a, T, Acum> {
19    /// Actual data store
20    data: Map<'a, &'a [u8], T>,
21    /// Pagination progress status
22    pub status: Item<'a, PaginationInfo<Acum>>,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
26pub struct PaginationInfo<Acum> {
27    /// Prevents map manipulation during pagination
28    pub is_locked: bool,
29    /// Starting item for next iteration
30    pub last_processed_item: Option<Vec<u8>>,
31    /// Accumulator item available for use in pagination function
32    pub accumulator: Option<Acum>,
33}
34
35impl<'a, T, Acum> PagedMap<'a, T, Acum> {
36    pub const fn new(namespace: &'a str, status_namespace: &'a str) -> Self {
37        PagedMap {
38            data: Map::new(namespace),
39            status: Item::new(status_namespace),
40        }
41    }
42
43    pub fn instantiate(&self, store: &mut dyn Storage) -> Result<(), StdError>
44    where
45        T: Serialize + DeserializeOwned,
46        Acum: Serialize + DeserializeOwned + Default + Clone,
47    {
48        self.status.save(
49            store,
50            &PaginationInfo {
51                is_locked: false,
52                accumulator: None,
53                last_processed_item: None,
54            },
55        )
56    }
57
58    pub fn save(&self, store: &mut dyn Storage, key: &[u8], data: &T) -> AbstractResult<()>
59    where
60        T: Serialize + DeserializeOwned,
61        Acum: Serialize + DeserializeOwned + Default + Clone,
62    {
63        if self.status.load(store)?.is_locked {
64            return Err(AbstractError::Storage {
65                object: PAGED_MAP.into(),
66                msg: "Can not save to map while locked. Proceed with operation first.".into(),
67            });
68        }
69        self.data.save(store, key, data).map_err(Into::into)
70    }
71
72    /// **Warning**: This function circumvents the storage lock. You should only use this in a pagination function.
73    pub fn unsafe_save(&self, store: &mut dyn Storage, key: &[u8], data: &T) -> AbstractResult<()>
74    where
75        T: Serialize + DeserializeOwned,
76        Acum: Serialize + DeserializeOwned + Default + Clone,
77    {
78        self.data.save(store, key, data).map_err(Into::into)
79    }
80
81    // Returns the removed item after deleting it
82    pub fn remove(&self, store: &mut dyn Storage, key: &[u8]) -> AbstractResult<T>
83    where
84        T: Serialize + DeserializeOwned,
85        Acum: Serialize + DeserializeOwned + Default + Clone,
86    {
87        if self.status.load(store)?.is_locked {
88            return Err(AbstractError::Storage {
89                object: PAGED_MAP.into(),
90                msg: "Can not save to map while locked. Proceed with operation first.".into(),
91            });
92        }
93        let old_item = self.data.load(store, key)?;
94        self.data.remove(store, key);
95
96        Ok(old_item)
97    }
98
99    /// **Warning**: This function circumvents the storage lock. You should only use this in a pagination function.
100    /// Returns the removed item after deleting it
101    pub fn unsafe_remove(&self, store: &mut dyn Storage, key: &[u8]) -> AbstractResult<T>
102    where
103        T: Serialize + DeserializeOwned,
104        Acum: Serialize + DeserializeOwned + Default + Clone,
105    {
106        let old_item = self.data.load(store, key)?;
107        self.data.remove(store, key);
108
109        Ok(old_item)
110    }
111
112    pub fn load(&self, store: &dyn Storage, key: &[u8]) -> AbstractResult<T>
113    where
114        T: Serialize + DeserializeOwned,
115        Acum: Serialize + DeserializeOwned + Default + Clone,
116    {
117        self.data.load(store, key).map_err(Into::into)
118    }
119
120    pub fn has(&self, store: &dyn Storage, key: &[u8]) -> bool
121    where
122        T: Serialize + DeserializeOwned,
123        Acum: Serialize + DeserializeOwned + Default + Clone,
124    {
125        self.data.has(store, key)
126    }
127
128    pub fn may_load(&self, store: &dyn Storage, key: &[u8]) -> AbstractResult<Option<T>>
129    where
130        T: Serialize + DeserializeOwned,
131        Acum: Serialize + DeserializeOwned + Default + Clone,
132    {
133        self.data.may_load(store, key).map_err(Into::into)
134    }
135
136    pub fn load_status(&self, store: &dyn Storage) -> AbstractResult<PaginationInfo<Acum>>
137    where
138        T: Serialize + DeserializeOwned,
139        Acum: Serialize + DeserializeOwned + Default + Clone,
140    {
141        self.status.load(store).map_err(Into::into)
142    }
143
144    pub fn key(&self, key: &[u8]) -> Path<T>
145    where
146        T: Serialize + DeserializeOwned,
147    {
148        self.data.key(key)
149    }
150
151    /// Perform some operation on a page of the map.
152    /// Returns an optional result of that computation.
153    /// Repeat until state unlocks to page over the whole map
154    /// Omits errors from f()
155    pub fn page_with_accumulator<C, FuncResult>(
156        &self,
157        deps: DepsMut,
158        limit: Option<u32>,
159        context: &C,
160        f: PaginationAccumulatorFunction<T, Acum, C, FuncResult>,
161    ) -> PaginationResult<Acum, Vec<FuncResult>>
162    where
163        T: Serialize + DeserializeOwned,
164        Acum: Serialize + DeserializeOwned + Default + Clone,
165    {
166        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT) as usize;
167        let mut status = self.status.load(deps.storage)?;
168        if !status.is_locked {
169            status.is_locked = true;
170            status.accumulator = Some(Acum::default());
171            status.last_processed_item = None;
172        }
173
174        let start = status.last_processed_item.clone().map(Bound::ExclusiveRaw);
175
176        let result: Vec<(Vec<u8>, T)> = self
177            .data
178            .range(deps.storage, start, None, Order::Ascending)
179            .take(limit)
180            .collect::<StdResult<Vec<(Vec<u8>, T)>>>()?;
181
182        // If not all items processed, update last item
183        let return_accumulator = if !result.is_empty() {
184            let last_key = result.last().unwrap().0.clone();
185            status.last_processed_item = Some(last_key);
186            None
187        } else {
188            // Everything processed, set to None and return accumulator
189            let accumulator: Option<Acum> = status.accumulator.clone();
190            status.is_locked = false;
191            status.accumulator = None;
192            accumulator
193        };
194
195        let function_results = result
196            .into_iter()
197            .filter_map(|(key, element)| {
198                f(
199                    &key,
200                    deps.storage,
201                    element,
202                    status
203                        .accumulator
204                        .as_mut()
205                        .expect("accumulator contains some value"),
206                    context,
207                )
208                .ok()
209                .unwrap_or(None)
210            })
211            .collect::<Vec<FuncResult>>();
212
213        self.status.save(deps.storage, &status)?;
214
215        Ok((return_accumulator, function_results))
216    }
217
218    /// Will apply function on each element (key, value) of the map. Errors on function f() are neglected.
219    /// Will not lock the set as no accumulator is used so map state changes are allowed.
220    pub fn page_without_accumulator<C, FuncResult>(
221        &self,
222        deps: DepsMut,
223        limit: Option<u32>,
224        context: &C,
225        f: PaginationFunction<T, C, FuncResult>,
226    ) -> AbstractResult<Vec<FuncResult>>
227    where
228        T: Serialize + DeserializeOwned,
229        Acum: Serialize + DeserializeOwned + Default + Clone,
230    {
231        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(MAX_MSG_LIMIT) as usize;
232        let mut status = self.status.load(deps.storage)?;
233
234        let start = status.last_processed_item.clone().map(Bound::ExclusiveRaw);
235        let result: Vec<(Vec<u8>, T)> = self
236            .data
237            .range(deps.storage, start, None, Order::Ascending)
238            .take(limit)
239            .collect::<StdResult<Vec<(Vec<u8>, T)>>>()?;
240
241        // If not all items processed, update last item
242        if !result.is_empty() {
243            let last_key = result.last().unwrap().0.clone();
244            status.last_processed_item = Some(last_key);
245        } else {
246            // Everything processed, unlock map
247            status.last_processed_item = None;
248        };
249
250        let function_results = result
251            .into_iter()
252            .filter_map(|(key, element)| {
253                f(&key, deps.storage, element, context).ok().unwrap_or(None)
254            })
255            .collect::<Vec<FuncResult>>();
256
257        self.status.save(deps.storage, &status)?;
258
259        Ok(function_results)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use cosmwasm_std::testing::{mock_dependencies, MockStorage};
266    use serde::{Deserialize, Serialize};
267
268    use super::*;
269
270    #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
271    struct Data {
272        pub name: String,
273        pub balance: u32,
274    }
275
276    #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Default)]
277    struct IncomeAcc {
278        pub total: u32,
279    }
280
281    const USERS: PagedMap<Data, IncomeAcc> = PagedMap::new("people", "status");
282
283    #[test]
284    fn save_and_load() {
285        let mut store = MockStorage::new();
286
287        // save and load on one key
288        let john = USERS.key(b"john");
289        let data = Data {
290            name: "John".to_string(),
291            balance: 32,
292        };
293        assert_eq!(None, john.may_load(&store).unwrap());
294        john.save(&mut store, &data).unwrap();
295        assert_eq!(data, john.load(&store).unwrap());
296
297        // nothing on another key
298        assert_eq!(None, USERS.may_load(&store, b"jack").unwrap());
299
300        // same named path gets the data
301        assert_eq!(data, USERS.load(&store, b"john").unwrap());
302
303        // removing leaves us empty
304        john.remove(&mut store);
305        assert_eq!(None, john.may_load(&store).unwrap());
306    }
307
308    #[test]
309    fn page_with_accumulator() {
310        // Change balance to 0, add balance to total and return value if even
311        fn accumulate_and_subtract_balances(
312            key: &[u8],
313            store: &mut dyn Storage,
314            mut value: Data,
315            acc: &mut IncomeAcc,
316            #[allow(clippy::ptr_arg)] _context: &String,
317        ) -> AbstractResult<Option<u32>> {
318            let balance = value.balance;
319            value.balance = 0;
320            acc.total += balance;
321            USERS.unsafe_save(store, key, &value)?;
322
323            if balance % 2 == 0 {
324                Ok(Some(balance))
325            } else {
326                Ok(None)
327            }
328        }
329
330        let mut deps = mock_dependencies();
331        USERS.instantiate(&mut deps.storage).unwrap();
332        let mut total = 0;
333        let mut even_numbers = vec![];
334
335        for i in 0..100 {
336            let data = Data {
337                name: "IrrelevantName".to_string(),
338                balance: i,
339            };
340            total += data.balance;
341            USERS
342                .save(&mut deps.storage, &i.to_be_bytes(), &data)
343                .unwrap();
344            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
345            if i % 2 == 0 {
346                even_numbers.push(i);
347            };
348            assert_eq!(stored_data, data);
349        }
350        let mut result_even_numbers = vec![];
351
352        // first call, external factor (like a time stamp) should determine when you can start the accumulator.
353        let (_, mut maybe_even_numbers) = USERS
354            .page_with_accumulator(
355                deps.as_mut(),
356                None,
357                &String::new(),
358                accumulate_and_subtract_balances,
359            )
360            .unwrap();
361
362        assert!(USERS
363            .status
364            .load(&deps.storage)
365            .unwrap()
366            .accumulator
367            .is_some());
368        assert!(USERS.status.load(&deps.storage).unwrap().is_locked);
369        // Keep track of the output
370        result_even_numbers.append(&mut maybe_even_numbers);
371
372        while USERS.status.load(&deps.storage).unwrap().is_locked {
373            let (maybe_accumulator, mut maybe_even_numbers) = USERS
374                .page_with_accumulator(
375                    deps.as_mut(),
376                    None,
377                    &String::new(),
378                    accumulate_and_subtract_balances,
379                )
380                .unwrap();
381
382            result_even_numbers.append(&mut maybe_even_numbers);
383
384            if let Some(acc) = maybe_accumulator {
385                // Accumulator should be done
386                assert_eq!(acc.total, total);
387                assert_eq!(result_even_numbers, even_numbers);
388            }
389        }
390        for i in 0..100u32 {
391            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
392            assert_eq!(
393                stored_data,
394                Data {
395                    name: "IrrelevantName".to_string(),
396                    balance: 0,
397                }
398            );
399        }
400    }
401
402    #[test]
403    fn page_without_accumulator() {
404        // Change balance to 0, add balance to total and return value if even
405        fn subtract_balances(
406            key: &[u8],
407            store: &mut dyn Storage,
408            mut value: Data,
409            #[allow(clippy::ptr_arg)] _context: &String,
410        ) -> AbstractResult<Option<u32>> {
411            let balance = value.balance;
412            value.balance = 0;
413            USERS.unsafe_save(store, key, &value)?;
414
415            if balance % 2 == 0 {
416                Ok(Some(balance))
417            } else {
418                Ok(None)
419            }
420        }
421
422        let mut deps = mock_dependencies();
423        USERS.instantiate(&mut deps.storage).unwrap();
424        let mut even_numbers = vec![];
425
426        for i in 0..100 {
427            let data = Data {
428                name: "IrrelevantName".to_string(),
429                balance: i,
430            };
431            USERS
432                .save(&mut deps.storage, &i.to_be_bytes(), &data)
433                .unwrap();
434            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
435            if i % 2 == 0 {
436                even_numbers.push(i);
437            };
438            assert_eq!(stored_data, data);
439        }
440        let mut result_even_numbers = vec![];
441
442        // first call, external factor (like a time stamp) should determine when you can start the accumulator.
443        let mut maybe_even_numbers = USERS
444            .page_without_accumulator(deps.as_mut(), None, &String::new(), subtract_balances)
445            .unwrap();
446
447        assert!(!USERS.status.load(&deps.storage).unwrap().is_locked);
448        // Keep track of the output
449        result_even_numbers.append(&mut maybe_even_numbers);
450
451        while USERS
452            .status
453            .load(&deps.storage)
454            .unwrap()
455            .last_processed_item
456            .is_some()
457        {
458            let mut maybe_even_numbers = USERS
459                .page_without_accumulator(deps.as_mut(), None, &String::new(), subtract_balances)
460                .unwrap();
461
462            result_even_numbers.append(&mut maybe_even_numbers);
463        }
464
465        assert_eq!(result_even_numbers, even_numbers);
466
467        for i in 0..100u32 {
468            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
469            assert_eq!(
470                stored_data,
471                Data {
472                    name: "IrrelevantName".to_string(),
473                    balance: 0,
474                }
475            );
476        }
477    }
478}