astroport_circular_buffer/
lib.rs

1//! Circular buffer which is built over [`Item`] and [`Map`].
2//! Might be useful to store time series data in contracts.
3//!
4//! # Example
5//! ```
6//! use cosmwasm_std::testing::MockStorage;
7//! use astroport_circular_buffer::{BufferManager, CircularBuffer};
8//!
9//! const CIRCULAR_BUFFER: CircularBuffer<u128> = CircularBuffer::new("buffer_state", "buffer");
10//!
11//! let mut store = MockStorage::new();
12//! BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
13//! let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
14//!
15//! let data = (1..=10u128).collect::<Vec<_>>();
16//! buffer.push_many(&data);
17//! buffer.commit(&mut store).unwrap();
18//!
19//! let values = buffer.read(&store, 0u32..=9, true).unwrap();
20//! let all_values = buffer.read_all(&store).unwrap();
21//! ```
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26
27use cosmwasm_schema::cw_serde;
28use cosmwasm_schema::serde::de::DeserializeOwned;
29use cosmwasm_schema::serde::Serialize;
30use cosmwasm_std::{StdError, Storage};
31use cw_storage_plus::{Item, Map};
32
33use crate::error::{BufferError, BufferResult};
34
35pub mod error;
36
37#[cw_serde]
38pub struct BufferState {
39    capacity: u32,
40    head: u32,
41}
42
43pub struct CircularBuffer<'a, V> {
44    state_key: &'a str,
45    array_namespace: &'a str,
46    data_type: PhantomData<V>,
47}
48
49impl<'a, V> CircularBuffer<'a, V> {
50    pub const fn new(state_key: &'a str, array_namespace: &'a str) -> Self {
51        Self {
52            state_key,
53            array_namespace,
54            data_type: PhantomData,
55        }
56    }
57
58    pub const fn state(&'a self) -> Item<BufferState> {
59        Item::new(self.state_key)
60    }
61
62    pub const fn array(&'a self) -> Map<u32, V> {
63        Map::new(self.array_namespace)
64    }
65}
66
67pub struct BufferManager<'a, V> {
68    state: BufferState,
69    store_iface: CircularBuffer<'a, V>,
70    precommit_buffer: HashMap<u32, &'a V>,
71}
72
73impl<'a, V> BufferManager<'a, V>
74where
75    V: Serialize + DeserializeOwned + 'a,
76{
77    /// Static function to initialize buffer in storage.
78    /// Intended to be called during contract initialization.
79    pub fn init(
80        store: &mut dyn Storage,
81        store_iface: CircularBuffer<'a, V>,
82        capacity: u32,
83    ) -> BufferResult<()> {
84        let state_iface = store_iface.state();
85
86        if state_iface.may_load(store)?.is_some() {
87            return Err(BufferError::BufferAlreadyInitialized {});
88        }
89
90        state_iface.save(store, &BufferState { capacity, head: 0 })?;
91
92        Ok(())
93    }
94
95    /// Initialize buffer manager.
96    /// In case buffer is not initialized it throws [`BufferError::BufferNotInitialized`] error.
97    pub fn new(store: &dyn Storage, store_iface: CircularBuffer<'a, V>) -> BufferResult<Self> {
98        Ok(Self {
99            state: store_iface.state().load(store).map_err(|err| {
100                if let StdError::NotFound { .. } = err {
101                    BufferError::BufferNotInitialized {}
102                } else {
103                    err.into()
104                }
105            })?,
106            store_iface,
107            precommit_buffer: HashMap::new(),
108        })
109    }
110
111    /// Returns current buffer capacity.
112    pub fn capacity(&self) -> u32 {
113        self.state.capacity
114    }
115
116    /// Returns current buffer head.
117    pub fn head(&self) -> u32 {
118        self.state.head
119    }
120
121    /// Push value to precommit buffer.
122    pub fn push(&mut self, value: &'a V) {
123        self.precommit_buffer.insert(self.state.head, value);
124        self.state.head = (self.state.head + 1) % self.state.capacity;
125    }
126
127    /// Push multiple values to precommit buffer.
128    pub fn push_many(&mut self, values: &'a [V]) {
129        for value in values {
130            self.push(value);
131        }
132    }
133
134    /// Push value to precommit buffer and commit it to storage.
135    pub fn instant_push(&mut self, store: &mut dyn Storage, value: &'a V) -> BufferResult<()> {
136        self.push(value);
137        self.commit(store)
138    }
139
140    /// Commit in storage current state and precommit buffer. Buffer is erased after commit.
141    pub fn commit(&mut self, store: &mut dyn Storage) -> BufferResult<()> {
142        let array_key = self.store_iface.array();
143        for (&key, value) in &self.precommit_buffer {
144            if key >= self.state.capacity {
145                return Err(BufferError::SaveValueError(key));
146            }
147            array_key.save(store, key, value)?;
148        }
149        self.precommit_buffer.clear();
150        self.store_iface.state().save(store, &self.state)?;
151
152        Ok(())
153    }
154
155    /// Read values from storage by indexes. If `stop_if_empty` is true,
156    /// reading will stop when first empty value is encountered.
157    /// Otherwise, [`BufferError::IndexNotFound`] error will be thrown.
158    ///
159    /// ## Examples:
160    /// ```
161    /// # use cosmwasm_std::{testing::MockStorage};
162    /// # use astroport_circular_buffer::{BufferManager, CircularBuffer};
163    /// # let mut store = MockStorage::new();
164    /// # const CIRCULAR_BUFFER: CircularBuffer<u128> = CircularBuffer::new("buffer_state", "buffer");
165    /// # BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
166    /// # let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
167    /// # let data = (1..=10u128).collect::<Vec<_>>();
168    /// # buffer.push_many(&data);
169    /// # buffer.commit(&mut store).unwrap();
170    ///
171    /// let values = buffer.read(&store, 0u32..=9, false).unwrap();
172    /// let values = buffer.read(&store, vec![0u32, 5, 7], false).unwrap();
173    /// let values = buffer.read(&store, (0u32..buffer.capacity()).step_by(2), false).unwrap();
174    /// ```
175    pub fn read(
176        &self,
177        store: &dyn Storage,
178        indexes: impl IntoIterator<Item = impl Into<u32> + Display>,
179        stop_if_empty: bool,
180    ) -> BufferResult<Vec<V>> {
181        let array_key = self.store_iface.array();
182        let mut values = vec![];
183        for index in indexes {
184            let ind = index.into();
185            if ind > self.state.capacity - 1 {
186                return Err(BufferError::ReadAheadError(ind));
187            } else {
188                let value = array_key.load(store, ind).map_err(|err| {
189                    if let StdError::NotFound { .. } = err {
190                        BufferError::IndexNotFound(ind)
191                    } else {
192                        err.into()
193                    }
194                });
195                match value {
196                    Ok(value) => values.push(value),
197                    Err(BufferError::IndexNotFound(_)) if stop_if_empty => return Ok(values),
198                    Err(err) => return Err(err),
199                }
200            }
201        }
202
203        Ok(values)
204    }
205
206    /// Read all available values from storage.
207    pub fn read_all(&self, store: &dyn Storage) -> BufferResult<Vec<V>> {
208        self.read(store, 0..self.state.capacity, true)
209    }
210
211    /// Read last saved value from storage. Returns None if buffer is empty.
212    pub fn read_last(&self, store: &dyn Storage) -> BufferResult<Option<V>> {
213        self.read_single(
214            store,
215            (self.state.capacity + self.state.head - 1) % self.state.capacity,
216        )
217    }
218
219    /// Looped read. Returns None if value in buffer does not exist.
220    pub fn read_single(
221        &self,
222        store: &dyn Storage,
223        index: impl Into<u32>,
224    ) -> BufferResult<Option<V>> {
225        let ind = index.into() % self.state.capacity;
226        let res = self.store_iface.array().load(store, ind);
227        if let Err(StdError::NotFound { .. }) = res {
228            Ok(None)
229        } else {
230            res.map(Some).map_err(Into::into)
231        }
232    }
233
234    /// This operation is gas consuming. However, it might be helpful in rare cases.
235    pub fn clear_buffer(&self, store: &mut dyn Storage) {
236        let array_key = self.store_iface.array();
237        (0..self.state.capacity).for_each(|i| array_key.remove(store, i))
238    }
239
240    /// Whether index exists in buffer.
241    pub fn exists(&self, store: &dyn Storage, index: u32) -> bool {
242        self.store_iface
243            .array()
244            .has(store, index % self.state.capacity)
245    }
246}
247
248impl<V: Debug> Debug for BufferManager<'_, V> {
249    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250        f.debug_struct("BufferManager")
251            .field("state", &self.state)
252            .field("precommit_buffer", &self.precommit_buffer)
253            .finish()
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use cosmwasm_std::testing::MockStorage;
260    use cosmwasm_std::Uint128;
261
262    use super::*;
263
264    type DataType = Uint128;
265    const CIRCULAR_BUFFER: CircularBuffer<DataType> = CircularBuffer::new("buffer_state", "buffer");
266
267    #[test]
268    fn test_single_push() {
269        let mut store = MockStorage::new();
270
271        BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
272
273        // Buffer can be initialized only once
274        let err = BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap_err();
275        assert_eq!(err, BufferError::BufferAlreadyInitialized {});
276
277        let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
278
279        assert_eq!(buffer.capacity(), 10);
280        assert_eq!(
281            format!("{:?}", &buffer),
282            "BufferManager { state: BufferState { capacity: 10, head: 0 }, precommit_buffer: {} }"
283        );
284
285        let data = (1..=15u8).map(DataType::from).collect::<Vec<_>>();
286        data.iter().for_each(|i| buffer.push(i));
287        buffer.commit(&mut store).unwrap();
288
289        // read last saved value
290        let head = buffer.read_last(&store).unwrap().unwrap();
291        assert_eq!(head.u128(), 15);
292
293        let saved = buffer
294            .read(&store, 0u32..=9, true)
295            .unwrap()
296            .into_iter()
297            .map(|i| i.u128())
298            .collect::<Vec<_>>();
299        assert_eq!(saved, vec![11, 12, 13, 14, 15, 6, 7, 8, 9, 10]);
300
301        // check instant push
302        let val = DataType::from(16u128);
303        buffer.instant_push(&mut store, &val).unwrap();
304        let saved = buffer
305            .read(&store, 0u32..=9, true)
306            .unwrap()
307            .into_iter()
308            .map(|i| i.u128())
309            .collect::<Vec<_>>();
310        assert_eq!(saved, vec![11, 12, 13, 14, 15, 16, 7, 8, 9, 10]);
311
312        // read invalid index
313        let err = buffer.read(&store, [10u32, 11u32], true).unwrap_err();
314        assert_eq!(err, BufferError::ReadAheadError(10));
315    }
316
317    #[test]
318    fn test_push_many() {
319        let mut store = MockStorage::new();
320
321        // Trying to create buffer manager before initialization
322        let err = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap_err();
323        assert_eq!(err, BufferError::BufferNotInitialized {});
324
325        BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
326
327        let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
328
329        // read empty buffer
330        let err = buffer.read(&store, [0u8], false).unwrap_err();
331        assert_eq!(err, BufferError::IndexNotFound(0));
332
333        let data = (1..=15u8).map(DataType::from).collect::<Vec<_>>();
334        buffer.push_many(&data);
335        buffer.commit(&mut store).unwrap();
336
337        let saved = buffer
338            .read_all(&store)
339            .unwrap()
340            .into_iter()
341            .map(|i| i.u128())
342            .collect::<Vec<_>>();
343        assert_eq!(saved, vec![11, 12, 13, 14, 15, 6, 7, 8, 9, 10]);
344
345        let partial_read = buffer
346            .read(&store, (0u32..buffer.capacity()).step_by(2), true)
347            .unwrap()
348            .into_iter()
349            .map(|i| i.u128())
350            .collect::<Vec<_>>();
351        assert_eq!(partial_read, vec![11, 13, 15, 7, 9]);
352    }
353}