abstract_os/objects/
paged_map.rs

1use crate::{error::AbstractOsError, AbstractResult};
2use cosmwasm_std::{DepsMut, Order, StdError, StdResult, Storage};
3use cw_storage_plus::{Bound, Item, Map, Path};
4use serde::{de::DeserializeOwned, Deserialize, Serialize};
5
6const DEFAULT_LIMIT: u32 = 10;
7const MAX_LIMIT: u32 = 30;
8const MAX_MSG_LIMIT: u32 = 15;
9const PAGED_MAP: &str = "paged_map";
10
11pub type PaginationResult<Acum, PageResult> = AbstractResult<(Option<Acum>, PageResult)>;
12pub type PaginationAccumulatorFunction<T, Acum, C, FuncResult> =
13    fn(&[u8], &mut dyn Storage, T, &mut Acum, &C) -> AbstractResult<Option<FuncResult>>;
14pub type PaginationFunction<T, C, FuncResult> =
15    fn(&[u8], &mut dyn Storage, T, &C) -> AbstractResult<Option<FuncResult>>;
16/// Allows for multi-transaction computation on a dataset. Required for large datasets due to gas constraints.
17pub struct PagedMap<'a, T, Acum> {
18    /// Actual data store
19    data: Map<'a, &'a [u8], T>,
20    /// Pagination progress status
21    pub status: Item<'a, PaginationInfo<Acum>>,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct PaginationInfo<Acum> {
26    /// Prevents map manipulation during pagination
27    pub is_locked: bool,
28    /// Starting item for next iteration
29    pub last_processed_item: Option<Vec<u8>>,
30    /// Accumulator item available for use in pagination function
31    pub accumulator: Option<Acum>,
32}
33
34impl<'a, T, Acum> PagedMap<'a, T, Acum> {
35    pub const fn new(namespace: &'a str, status_namespace: &'a str) -> Self {
36        PagedMap {
37            data: Map::new(namespace),
38            status: Item::new(status_namespace),
39        }
40    }
41
42    pub fn instantiate(&self, store: &mut dyn Storage) -> Result<(), StdError>
43    where
44        T: Serialize + DeserializeOwned,
45        Acum: Serialize + DeserializeOwned + Default + Clone,
46    {
47        self.status.save(
48            store,
49            &PaginationInfo {
50                is_locked: false,
51                accumulator: None,
52                last_processed_item: None,
53            },
54        )
55    }
56
57    pub fn save(&self, store: &mut dyn Storage, key: &[u8], data: &T) -> AbstractResult<()>
58    where
59        T: Serialize + DeserializeOwned,
60        Acum: Serialize + DeserializeOwned + Default + Clone,
61    {
62        if self.status.load(store)?.is_locked {
63            return Err(AbstractOsError::Storage {
64                object: PAGED_MAP.into(),
65                msg: "Can not save to map while locked. Proceed with operation first.".into(),
66            });
67        }
68        self.data.save(store, key, data).map_err(Into::into)
69    }
70
71    /// **Warning**: This function circumvents the storage lock. You should only use this in a pagination function.
72    pub fn unsafe_save(&self, store: &mut dyn Storage, key: &[u8], data: &T) -> AbstractResult<()>
73    where
74        T: Serialize + DeserializeOwned,
75        Acum: Serialize + DeserializeOwned + Default + Clone,
76    {
77        self.data.save(store, key, data).map_err(Into::into)
78    }
79
80    // Returns the removed item after deleting it
81    pub fn remove(&self, store: &mut dyn Storage, key: &[u8]) -> AbstractResult<T>
82    where
83        T: Serialize + DeserializeOwned,
84        Acum: Serialize + DeserializeOwned + Default + Clone,
85    {
86        if self.status.load(store)?.is_locked {
87            return Err(AbstractOsError::Storage {
88                object: PAGED_MAP.into(),
89                msg: "Can not save to map while locked. Proceed with operation first.".into(),
90            });
91        }
92        let old_item = self.data.load(store, key)?;
93        self.data.remove(store, key);
94
95        Ok(old_item)
96    }
97
98    /// **Warning**: This function circumvents the storage lock. You should only use this in a pagination function.
99    /// Returns the removed item after deleting it
100    pub fn unsafe_remove(&self, store: &mut dyn Storage, key: &[u8]) -> AbstractResult<T>
101    where
102        T: Serialize + DeserializeOwned,
103        Acum: Serialize + DeserializeOwned + Default + Clone,
104    {
105        let old_item = self.data.load(store, key)?;
106        self.data.remove(store, key);
107
108        Ok(old_item)
109    }
110
111    pub fn load(&self, store: &dyn Storage, key: &[u8]) -> AbstractResult<T>
112    where
113        T: Serialize + DeserializeOwned,
114        Acum: Serialize + DeserializeOwned + Default + Clone,
115    {
116        self.data.load(store, key).map_err(Into::into)
117    }
118
119    pub fn has(&self, store: &dyn Storage, key: &[u8]) -> bool
120    where
121        T: Serialize + DeserializeOwned,
122        Acum: Serialize + DeserializeOwned + Default + Clone,
123    {
124        self.data.has(store, key)
125    }
126
127    pub fn may_load(&self, store: &dyn Storage, key: &[u8]) -> AbstractResult<Option<T>>
128    where
129        T: Serialize + DeserializeOwned,
130        Acum: Serialize + DeserializeOwned + Default + Clone,
131    {
132        self.data.may_load(store, key).map_err(Into::into)
133    }
134
135    pub fn load_status(&self, store: &dyn Storage) -> AbstractResult<PaginationInfo<Acum>>
136    where
137        T: Serialize + DeserializeOwned,
138        Acum: Serialize + DeserializeOwned + Default + Clone,
139    {
140        self.status.load(store).map_err(Into::into)
141    }
142
143    pub fn key(&self, key: &[u8]) -> Path<T>
144    where
145        T: Serialize + DeserializeOwned,
146    {
147        self.data.key(key)
148    }
149
150    /// Perform some operation on a page of the map.
151    /// Returns an optional result of that computation.
152    /// Repeat until state unlocks to page over the whole map
153    /// Omits errors from f()
154    pub fn page_with_accumulator<C, FuncResult>(
155        &self,
156        deps: DepsMut,
157        limit: Option<u32>,
158        context: &C,
159        f: PaginationAccumulatorFunction<T, Acum, C, FuncResult>,
160    ) -> PaginationResult<Acum, Vec<FuncResult>>
161    where
162        T: Serialize + DeserializeOwned,
163        Acum: Serialize + DeserializeOwned + Default + Clone,
164    {
165        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT) as usize;
166        let mut status = self.status.load(deps.storage)?;
167        if !status.is_locked {
168            status.is_locked = true;
169            status.accumulator = Some(Acum::default());
170            status.last_processed_item = None;
171        }
172
173        let start = status.last_processed_item.clone().map(Bound::ExclusiveRaw);
174
175        let result: Vec<(Vec<u8>, T)> = self
176            .data
177            .range(deps.storage, start, None, Order::Ascending)
178            .take(limit)
179            .collect::<StdResult<Vec<(Vec<u8>, T)>>>()?;
180
181        // If not all items processed, update last item
182        let return_accumulator = if !result.is_empty() {
183            let last_key = result.last().unwrap().0.clone();
184            status.last_processed_item = Some(last_key);
185            None
186        } else {
187            // Everything processed, set to None and return accumulator
188            let accumulator: Option<Acum> = status.accumulator.clone();
189            status.is_locked = false;
190            status.accumulator = None;
191            accumulator
192        };
193
194        let function_results = result
195            .into_iter()
196            .filter_map(|(key, element)| {
197                f(
198                    &key,
199                    deps.storage,
200                    element,
201                    status
202                        .accumulator
203                        .as_mut()
204                        .expect("accumulator contains some value"),
205                    context,
206                )
207                .ok()
208                .unwrap_or(None)
209            })
210            .collect::<Vec<FuncResult>>();
211
212        self.status.save(deps.storage, &status)?;
213
214        Ok((return_accumulator, function_results))
215    }
216
217    /// Will apply function on each element (key, value) of the map. Errors on function f() are neglected.
218    /// Will not lock the set as no accumulator is used so map state changes are allowed.
219    pub fn page_without_accumulator<C, FuncResult>(
220        &self,
221        deps: DepsMut,
222        limit: Option<u32>,
223        context: &C,
224        f: PaginationFunction<T, C, FuncResult>,
225    ) -> AbstractResult<Vec<FuncResult>>
226    where
227        T: Serialize + DeserializeOwned,
228        Acum: Serialize + DeserializeOwned + Default + Clone,
229    {
230        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(MAX_MSG_LIMIT) as usize;
231        let mut status = self.status.load(deps.storage)?;
232
233        let start = status.last_processed_item.clone().map(Bound::ExclusiveRaw);
234        let result: Vec<(Vec<u8>, T)> = self
235            .data
236            .range(deps.storage, start, None, Order::Ascending)
237            .take(limit)
238            .collect::<StdResult<Vec<(Vec<u8>, T)>>>()?;
239
240        // If not all items processed, update last item
241        if !result.is_empty() {
242            let last_key = result.last().unwrap().0.clone();
243            status.last_processed_item = Some(last_key);
244        } else {
245            // Everything processed, unlock map
246            status.last_processed_item = None;
247        };
248
249        let function_results = result
250            .into_iter()
251            .filter_map(|(key, element)| {
252                f(&key, deps.storage, element, context).ok().unwrap_or(None)
253            })
254            .collect::<Vec<FuncResult>>();
255
256        self.status.save(deps.storage, &status)?;
257
258        Ok(function_results)
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use serde::{Deserialize, Serialize};
266
267    use crate::objects::core::OsId;
268    use cosmwasm_std::testing::{mock_dependencies, MockStorage};
269
270    #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
271    struct Data {
272        pub name: String,
273        pub balance: u32,
274    }
275
276    #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Default)]
277    struct IncomeAcc {
278        pub total: u32,
279    }
280
281    const USERS: PagedMap<Data, IncomeAcc> = PagedMap::new("people", "status");
282
283    #[test]
284    fn save_and_load() {
285        let mut store = MockStorage::new();
286
287        // save and load on one key
288        let john = USERS.key(b"john");
289        let data = Data {
290            name: "John".to_string(),
291            balance: 32,
292        };
293        assert_eq!(None, john.may_load(&store).unwrap());
294        john.save(&mut store, &data).unwrap();
295        assert_eq!(data, john.load(&store).unwrap());
296
297        // nothing on another key
298        assert_eq!(None, USERS.may_load(&store, b"jack").unwrap());
299
300        // same named path gets the data
301        assert_eq!(data, USERS.load(&store, b"john").unwrap());
302
303        // removing leaves us empty
304        john.remove(&mut store);
305        assert_eq!(None, john.may_load(&store).unwrap());
306    }
307
308    #[test]
309    fn page_with_accumulator() {
310        // Change balance to 0, add balance to total and return value if even
311        fn accumulate_and_subtract_balances(
312            key: &[u8],
313            store: &mut dyn Storage,
314            mut value: Data,
315            acc: &mut IncomeAcc,
316            _context: &String,
317        ) -> AbstractResult<Option<u32>> {
318            let balance = value.balance;
319            value.balance = 0;
320            acc.total += balance;
321            USERS.unsafe_save(store, key, &value)?;
322
323            if balance % 2 == 0 {
324                Ok(Some(balance))
325            } else {
326                Ok(None)
327            }
328        }
329
330        let mut deps = mock_dependencies();
331        USERS.instantiate(&mut deps.storage).unwrap();
332        let mut total = 0;
333        let mut even_numbers = vec![];
334
335        for i in 0..100 {
336            let data = Data {
337                name: "IrrelevantName".to_string(),
338                balance: i,
339            };
340            total += data.balance;
341            USERS
342                .save(&mut deps.storage, &i.to_be_bytes(), &data)
343                .unwrap();
344            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
345            if i % 2 == 0 {
346                even_numbers.push(i);
347            };
348            assert_eq!(stored_data, data);
349        }
350        let mut result_even_numbers = vec![];
351
352        // first call, external factor (like a time stamp) should determine when you can start the accumulator.
353        let (_, mut maybe_even_numbers) = USERS
354            .page_with_accumulator(
355                deps.as_mut(),
356                None,
357                &String::new(),
358                accumulate_and_subtract_balances,
359            )
360            .unwrap();
361
362        assert!(USERS
363            .status
364            .load(&deps.storage)
365            .unwrap()
366            .accumulator
367            .is_some());
368        assert!(USERS.status.load(&deps.storage).unwrap().is_locked);
369        // Keep track of the output
370        result_even_numbers.append(&mut maybe_even_numbers);
371
372        while USERS.status.load(&deps.storage).unwrap().is_locked {
373            let (maybe_accumulator, mut maybe_even_numbers) = USERS
374                .page_with_accumulator(
375                    deps.as_mut(),
376                    None,
377                    &String::new(),
378                    accumulate_and_subtract_balances,
379                )
380                .unwrap();
381
382            result_even_numbers.append(&mut maybe_even_numbers);
383
384            if let Some(acc) = maybe_accumulator {
385                // Accumulator should be done
386                assert_eq!(acc.total, total);
387                assert_eq!(result_even_numbers, even_numbers);
388            }
389        }
390        for i in 0..100u32 {
391            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
392            assert_eq!(
393                stored_data,
394                Data {
395                    name: "IrrelevantName".to_string(),
396                    balance: 0,
397                }
398            );
399        }
400    }
401
402    #[test]
403    fn page_without_accumulator() {
404        // Change balance to 0, add balance to total and return value if even
405        fn subtract_balances(
406            key: &[u8],
407            store: &mut dyn Storage,
408            mut value: Data,
409            _context: &String,
410        ) -> AbstractResult<Option<OsId>> {
411            let balance = value.balance;
412            value.balance = 0;
413            USERS.unsafe_save(store, key, &value)?;
414
415            if balance % 2 == 0 {
416                Ok(Some(balance))
417            } else {
418                Ok(None)
419            }
420        }
421
422        let mut deps = mock_dependencies();
423        USERS.instantiate(&mut deps.storage).unwrap();
424        let mut even_numbers = vec![];
425
426        for i in 0..100 {
427            let data = Data {
428                name: "IrrelevantName".to_string(),
429                balance: i,
430            };
431            USERS
432                .save(&mut deps.storage, &i.to_be_bytes(), &data)
433                .unwrap();
434            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
435            if i % 2 == 0 {
436                even_numbers.push(i);
437            };
438            assert_eq!(stored_data, data);
439        }
440        let mut result_even_numbers = vec![];
441
442        // first call, external factor (like a time stamp) should determine when you can start the accumulator.
443        let mut maybe_even_numbers = USERS
444            .page_without_accumulator(deps.as_mut(), None, &String::new(), subtract_balances)
445            .unwrap();
446
447        assert!(!USERS.status.load(&deps.storage).unwrap().is_locked);
448        // Keep track of the output
449        result_even_numbers.append(&mut maybe_even_numbers);
450
451        while USERS
452            .status
453            .load(&deps.storage)
454            .unwrap()
455            .last_processed_item
456            .is_some()
457        {
458            let mut maybe_even_numbers = USERS
459                .page_without_accumulator(deps.as_mut(), None, &String::new(), subtract_balances)
460                .unwrap();
461
462            result_even_numbers.append(&mut maybe_even_numbers);
463        }
464
465        assert_eq!(result_even_numbers, even_numbers);
466
467        for i in 0..100u32 {
468            let stored_data = USERS.load(&deps.storage, &i.to_be_bytes()).unwrap();
469            assert_eq!(
470                stored_data,
471                Data {
472                    name: "IrrelevantName".to_string(),
473                    balance: 0,
474                }
475            );
476        }
477    }
478}