astroport_circular_buffer/
lib.rs1use std::collections::HashMap;
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26
27use cosmwasm_schema::cw_serde;
28use cosmwasm_schema::serde::de::DeserializeOwned;
29use cosmwasm_schema::serde::Serialize;
30use cosmwasm_std::{StdError, Storage};
31use cw_storage_plus::{Item, Map};
32
33use crate::error::{BufferError, BufferResult};
34
35pub mod error;
36
37#[cw_serde]
38pub struct BufferState {
39 capacity: u32,
40 head: u32,
41}
42
43pub struct CircularBuffer<'a, V> {
44 state_key: &'a str,
45 array_namespace: &'a str,
46 data_type: PhantomData<V>,
47}
48
49impl<'a, V> CircularBuffer<'a, V> {
50 pub const fn new(state_key: &'a str, array_namespace: &'a str) -> Self {
51 Self {
52 state_key,
53 array_namespace,
54 data_type: PhantomData,
55 }
56 }
57
58 pub const fn state(&'a self) -> Item<BufferState> {
59 Item::new(self.state_key)
60 }
61
62 pub const fn array(&'a self) -> Map<u32, V> {
63 Map::new(self.array_namespace)
64 }
65}
66
67pub struct BufferManager<'a, V> {
68 state: BufferState,
69 store_iface: CircularBuffer<'a, V>,
70 precommit_buffer: HashMap<u32, &'a V>,
71}
72
73impl<'a, V> BufferManager<'a, V>
74where
75 V: Serialize + DeserializeOwned + 'a,
76{
77 pub fn init(
80 store: &mut dyn Storage,
81 store_iface: CircularBuffer<'a, V>,
82 capacity: u32,
83 ) -> BufferResult<()> {
84 let state_iface = store_iface.state();
85
86 if state_iface.may_load(store)?.is_some() {
87 return Err(BufferError::BufferAlreadyInitialized {});
88 }
89
90 state_iface.save(store, &BufferState { capacity, head: 0 })?;
91
92 Ok(())
93 }
94
95 pub fn new(store: &dyn Storage, store_iface: CircularBuffer<'a, V>) -> BufferResult<Self> {
98 Ok(Self {
99 state: store_iface.state().load(store).map_err(|err| {
100 if let StdError::NotFound { .. } = err {
101 BufferError::BufferNotInitialized {}
102 } else {
103 err.into()
104 }
105 })?,
106 store_iface,
107 precommit_buffer: HashMap::new(),
108 })
109 }
110
111 pub fn capacity(&self) -> u32 {
113 self.state.capacity
114 }
115
116 pub fn head(&self) -> u32 {
118 self.state.head
119 }
120
121 pub fn push(&mut self, value: &'a V) {
123 self.precommit_buffer.insert(self.state.head, value);
124 self.state.head = (self.state.head + 1) % self.state.capacity;
125 }
126
127 pub fn push_many(&mut self, values: &'a [V]) {
129 for value in values {
130 self.push(value);
131 }
132 }
133
134 pub fn instant_push(&mut self, store: &mut dyn Storage, value: &'a V) -> BufferResult<()> {
136 self.push(value);
137 self.commit(store)
138 }
139
140 pub fn commit(&mut self, store: &mut dyn Storage) -> BufferResult<()> {
142 let array_key = self.store_iface.array();
143 for (&key, value) in &self.precommit_buffer {
144 if key >= self.state.capacity {
145 return Err(BufferError::SaveValueError(key));
146 }
147 array_key.save(store, key, value)?;
148 }
149 self.precommit_buffer.clear();
150 self.store_iface.state().save(store, &self.state)?;
151
152 Ok(())
153 }
154
155 pub fn read(
176 &self,
177 store: &dyn Storage,
178 indexes: impl IntoIterator<Item = impl Into<u32> + Display>,
179 stop_if_empty: bool,
180 ) -> BufferResult<Vec<V>> {
181 let array_key = self.store_iface.array();
182 let mut values = vec![];
183 for index in indexes {
184 let ind = index.into();
185 if ind > self.state.capacity - 1 {
186 return Err(BufferError::ReadAheadError(ind));
187 } else {
188 let value = array_key.load(store, ind).map_err(|err| {
189 if let StdError::NotFound { .. } = err {
190 BufferError::IndexNotFound(ind)
191 } else {
192 err.into()
193 }
194 });
195 match value {
196 Ok(value) => values.push(value),
197 Err(BufferError::IndexNotFound(_)) if stop_if_empty => return Ok(values),
198 Err(err) => return Err(err),
199 }
200 }
201 }
202
203 Ok(values)
204 }
205
206 pub fn read_all(&self, store: &dyn Storage) -> BufferResult<Vec<V>> {
208 self.read(store, 0..self.state.capacity, true)
209 }
210
211 pub fn read_last(&self, store: &dyn Storage) -> BufferResult<Option<V>> {
213 self.read_single(
214 store,
215 (self.state.capacity + self.state.head - 1) % self.state.capacity,
216 )
217 }
218
219 pub fn read_single(
221 &self,
222 store: &dyn Storage,
223 index: impl Into<u32>,
224 ) -> BufferResult<Option<V>> {
225 let ind = index.into() % self.state.capacity;
226 let res = self.store_iface.array().load(store, ind);
227 if let Err(StdError::NotFound { .. }) = res {
228 Ok(None)
229 } else {
230 res.map(Some).map_err(Into::into)
231 }
232 }
233
234 pub fn clear_buffer(&self, store: &mut dyn Storage) {
236 let array_key = self.store_iface.array();
237 (0..self.state.capacity).for_each(|i| array_key.remove(store, i))
238 }
239
240 pub fn exists(&self, store: &dyn Storage, index: u32) -> bool {
242 self.store_iface
243 .array()
244 .has(store, index % self.state.capacity)
245 }
246}
247
248impl<V: Debug> Debug for BufferManager<'_, V> {
249 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250 f.debug_struct("BufferManager")
251 .field("state", &self.state)
252 .field("precommit_buffer", &self.precommit_buffer)
253 .finish()
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use cosmwasm_std::testing::MockStorage;
260 use cosmwasm_std::Uint128;
261
262 use super::*;
263
264 type DataType = Uint128;
265 const CIRCULAR_BUFFER: CircularBuffer<DataType> = CircularBuffer::new("buffer_state", "buffer");
266
267 #[test]
268 fn test_single_push() {
269 let mut store = MockStorage::new();
270
271 BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
272
273 let err = BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap_err();
275 assert_eq!(err, BufferError::BufferAlreadyInitialized {});
276
277 let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
278
279 assert_eq!(buffer.capacity(), 10);
280 assert_eq!(
281 format!("{:?}", &buffer),
282 "BufferManager { state: BufferState { capacity: 10, head: 0 }, precommit_buffer: {} }"
283 );
284
285 let data = (1..=15u8).map(DataType::from).collect::<Vec<_>>();
286 data.iter().for_each(|i| buffer.push(i));
287 buffer.commit(&mut store).unwrap();
288
289 let head = buffer.read_last(&store).unwrap().unwrap();
291 assert_eq!(head.u128(), 15);
292
293 let saved = buffer
294 .read(&store, 0u32..=9, true)
295 .unwrap()
296 .into_iter()
297 .map(|i| i.u128())
298 .collect::<Vec<_>>();
299 assert_eq!(saved, vec![11, 12, 13, 14, 15, 6, 7, 8, 9, 10]);
300
301 let val = DataType::from(16u128);
303 buffer.instant_push(&mut store, &val).unwrap();
304 let saved = buffer
305 .read(&store, 0u32..=9, true)
306 .unwrap()
307 .into_iter()
308 .map(|i| i.u128())
309 .collect::<Vec<_>>();
310 assert_eq!(saved, vec![11, 12, 13, 14, 15, 16, 7, 8, 9, 10]);
311
312 let err = buffer.read(&store, [10u32, 11u32], true).unwrap_err();
314 assert_eq!(err, BufferError::ReadAheadError(10));
315 }
316
317 #[test]
318 fn test_push_many() {
319 let mut store = MockStorage::new();
320
321 let err = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap_err();
323 assert_eq!(err, BufferError::BufferNotInitialized {});
324
325 BufferManager::init(&mut store, CIRCULAR_BUFFER, 10).unwrap();
326
327 let mut buffer = BufferManager::new(&store, CIRCULAR_BUFFER).unwrap();
328
329 let err = buffer.read(&store, [0u8], false).unwrap_err();
331 assert_eq!(err, BufferError::IndexNotFound(0));
332
333 let data = (1..=15u8).map(DataType::from).collect::<Vec<_>>();
334 buffer.push_many(&data);
335 buffer.commit(&mut store).unwrap();
336
337 let saved = buffer
338 .read_all(&store)
339 .unwrap()
340 .into_iter()
341 .map(|i| i.u128())
342 .collect::<Vec<_>>();
343 assert_eq!(saved, vec![11, 12, 13, 14, 15, 6, 7, 8, 9, 10]);
344
345 let partial_read = buffer
346 .read(&store, (0u32..buffer.capacity()).step_by(2), true)
347 .unwrap()
348 .into_iter()
349 .map(|i| i.u128())
350 .collect::<Vec<_>>();
351 assert_eq!(partial_read, vec![11, 13, 15, 7, 9]);
352 }
353}