Skip to main content

ave_actors_store/
database.rs

1//! Storage backend traits: [`DbManager`], [`Collection`], and [`State`].
2
3use crate::error::Error;
4
5/// Key-value pair yielded by a [`Collection`] iterator.
6pub type CollectionEntry = (String, Vec<u8>);
7
8/// Result item yielded by a [`Collection`] iterator.
9pub type CollectionEntryResult = Result<CollectionEntry, Error>;
10
11/// Boxed iterator returned by [`Collection::iter`].
12pub type CollectionIter<'a> =
13    Box<dyn Iterator<Item = CollectionEntryResult> + 'a>;
14
15/// Factory for creating [`Collection`] and [`State`] storage backends.
16///
17/// Implement this trait to plug in a custom database (SQLite, RocksDB, etc.).
18/// The type parameters `C` and `S` are the concrete collection and state types
19/// your backend produces.
20pub trait DbManager<C, S>: Sync + Send + Clone
21where
22    C: Collection + 'static,
23    S: State + 'static,
24{
25    /// Creates a new ordered key-value collection, typically used to store events.
26    ///
27    /// `name` identifies the table or column family; `prefix` scopes all keys so
28    /// multiple actors can share the same physical table without key collisions.
29    /// Returns an error if the backend cannot allocate the collection.
30    fn create_collection(&self, name: &str, prefix: &str) -> Result<C, Error>;
31
32    /// Creates a single-value state store, typically used to store actor snapshots.
33    ///
34    /// `name` identifies the table or column family; `prefix` scopes the stored
35    /// value within it. Returns an error if the backend cannot allocate the storage.
36    fn create_state(&self, name: &str, prefix: &str) -> Result<S, Error>;
37
38    /// Optional cleanup hook called when the database manager should shut down.
39    ///
40    /// Backends that need to flush WAL buffers or close connections should override
41    /// this. The default implementation is a no-op and always returns `Ok(())`.
42    fn stop(&mut self) -> Result<(), Error> {
43        Ok(())
44    }
45}
46
47/// Single-value storage used to persist actor state snapshots.
48///
49/// Unlike [`Collection`], a `State` holds only the most recent value.
50/// Implementations must be `Send + Sync + 'static` to be used across async tasks.
51pub trait State: Sync + Send + 'static {
52    /// Returns the name identifier of this state storage unit.
53    fn name(&self) -> &str;
54
55    /// Returns the currently stored bytes.
56    ///
57    /// Returns [`Error::EntryNotFound`] if no value has been stored yet.
58    fn get(&self) -> Result<Vec<u8>, Error>;
59
60    /// Stores `data` as the current value, replacing any previous one.
61    fn put(&mut self, data: &[u8]) -> Result<(), Error>;
62
63    /// Deletes the current value.
64    ///
65    /// Returns [`Error::EntryNotFound`] if there is nothing to delete.
66    fn del(&mut self) -> Result<(), Error>;
67
68    /// Removes all data from this state store. Succeeds silently if the store is already empty.
69    fn purge(&mut self) -> Result<(), Error>;
70}
71
72/// Ordered key-value storage used to persist event sequences.
73///
74/// Keys are typically zero-padded sequence numbers (e.g. `"00000000000000000042"`),
75/// which makes the last-entry and range queries efficient. Implementations must
76/// be `Send + Sync + 'static`.
77pub trait Collection: Sync + Send + 'static {
78    /// Returns the name identifier of this collection.
79    fn name(&self) -> &str;
80
81    /// Returns the value stored under `key`.
82    ///
83    /// Returns [`Error::EntryNotFound`] if the key does not exist.
84    fn get(&self, key: &str) -> Result<Vec<u8>, Error>;
85
86    /// Associates `data` with `key`, inserting or replacing any previous value.
87    fn put(&mut self, key: &str, data: &[u8]) -> Result<(), Error>;
88
89    /// Removes the entry for `key`.
90    ///
91    /// Returns [`Error::EntryNotFound`] if the key does not exist.
92    fn del(&mut self, key: &str) -> Result<(), Error>;
93
94    /// Returns the last key-value pair in insertion/sort order, or `None` if the collection is empty.
95    fn last(&self) -> Result<Option<(String, Vec<u8>)>, Error>;
96
97    /// Removes all entries from the collection.
98    fn purge(&mut self) -> Result<(), Error>;
99
100    /// Returns an iterator over all key-value pairs.
101    ///
102    /// Pass `reverse = true` to iterate in descending key order.
103    /// Returns an error if the backend cannot acquire the necessary locks to start
104    /// iteration; individual items in the iterator may also yield errors.
105    fn iter<'a>(&'a self, reverse: bool) -> Result<CollectionIter<'a>, Error>;
106
107    /// Returns at most `quantity.abs()` values, optionally starting after `from`.
108    ///
109    /// If `from` is `Some(key)`, iteration begins at the entry immediately after `key`.
110    /// A positive `quantity` iterates forward; negative iterates in reverse.
111    /// Returns [`Error::EntryNotFound`] if `from` is provided but does not exist.
112    fn get_by_range(
113        &self,
114        from: Option<String>,
115        quantity: isize,
116    ) -> Result<Vec<Vec<u8>>, Error> {
117        fn convert<'a>(
118            iter: impl Iterator<Item = CollectionEntryResult> + 'a,
119        ) -> CollectionIter<'a> {
120            Box::new(iter)
121        }
122        let (mut iter, quantity) = match from {
123            Some(key) => {
124                // Find the key
125                let iter = if quantity >= 0 {
126                    self.iter(false)?
127                } else {
128                    self.iter(true)?
129                };
130                let mut iter = iter.peekable();
131                loop {
132                    let Some(next_item) = iter.peek() else {
133                        return Err(Error::EntryNotFound { key });
134                    };
135                    let (current_key, _) = match next_item {
136                        Ok((current_key, event)) => (current_key, event),
137                        Err(error) => return Err(error.clone()),
138                    };
139                    if current_key == &key {
140                        break;
141                    }
142                    iter.next();
143                }
144                iter.next(); // Exclusive From
145                (convert(iter), quantity.abs())
146            }
147            None => {
148                if quantity >= 0 {
149                    (self.iter(false)?, quantity)
150                } else {
151                    (self.iter(true)?, quantity.abs())
152                }
153            }
154        };
155        let mut result = Vec::new();
156        let mut counter = 0;
157        while counter < quantity {
158            let Some(item) = iter.next() else {
159                break;
160            };
161            let (_, event) = item?;
162            result.push(event);
163            counter += 1;
164        }
165        Ok(result)
166    }
167}
168
169#[macro_export]
170macro_rules! test_store_trait {
171    ($name:ident: $type:ty: $type2:ty) => {
172        #[cfg(test)]
173        mod $name {
174            use super::*;
175            use $crate::error::Error;
176
177            #[test]
178            fn test_create_collection() {
179                let mut manager = <$type>::default();
180                let store: $type2 =
181                    manager.create_collection("test", "test").unwrap();
182                assert_eq!(Collection::name(&store), "test");
183                assert!(manager.stop().is_ok())
184            }
185
186            #[test]
187            fn test_create_state() {
188                let mut manager = <$type>::default();
189                let store: $type2 =
190                    manager.create_state("test", "test").unwrap();
191                assert_eq!(State::name(&store), "test");
192                assert!(manager.stop().is_ok())
193            }
194
195            #[test]
196            fn test_put_get_collection() {
197                let mut manager = <$type>::default();
198                let mut store: $type2 =
199                    manager.create_collection("test", "test").unwrap();
200                Collection::put(&mut store, "key", b"value").unwrap();
201                assert_eq!(Collection::get(&store, "key").unwrap(), b"value");
202                assert!(manager.stop().is_ok())
203            }
204
205            #[test]
206            fn test_put_get_state() {
207                let mut manager = <$type>::default();
208                let mut store: $type2 =
209                    manager.create_state("test", "test").unwrap();
210                State::put(&mut store, b"value").unwrap();
211                assert_eq!(State::get(&store).unwrap(), b"value");
212                assert!(manager.stop().is_ok())
213            }
214
215            #[test]
216            fn test_del_collection() {
217                let mut manager = <$type>::default();
218                let mut store: $type2 =
219                    manager.create_collection("test", "test").unwrap();
220                Collection::put(&mut store, "key", b"value").unwrap();
221                Collection::del(&mut store, "key").unwrap();
222                assert_eq!(
223                    Collection::get(&store, "key"),
224                    Err(Error::EntryNotFound {
225                        key: "test.key".to_owned()
226                    })
227                );
228                assert!(manager.stop().is_ok())
229            }
230
231            #[test]
232            fn test_del_state() {
233                let mut manager = <$type>::default();
234                let mut store: $type2 =
235                    manager.create_state("test", "test").unwrap();
236                State::put(&mut store, b"value").unwrap();
237                State::del(&mut store).unwrap();
238                assert_eq!(
239                    State::get(&store),
240                    Err(Error::EntryNotFound {
241                        key: "test".to_owned()
242                    })
243                );
244                assert!(manager.stop().is_ok())
245            }
246
247            #[test]
248            fn test_iter() {
249                let mut manager = <$type>::default();
250                let mut store: $type2 =
251                    manager.create_collection("test", "test").unwrap();
252                Collection::put(&mut store, "key1", b"value1").unwrap();
253                Collection::put(&mut store, "key2", b"value2").unwrap();
254                Collection::put(&mut store, "key3", b"value3").unwrap();
255                let items: Vec<_> = store
256                    .iter(false)
257                    .unwrap()
258                    .collect::<Result<Vec<_>, _>>()
259                    .unwrap();
260                assert_eq!(
261                    items,
262                    vec![
263                        ("key1".to_string(), b"value1".to_vec()),
264                        ("key2".to_string(), b"value2".to_vec()),
265                        ("key3".to_string(), b"value3".to_vec()),
266                    ]
267                );
268                assert!(manager.stop().is_ok())
269            }
270
271            #[test]
272            fn test_iter_reverse() {
273                let mut manager = <$type>::default();
274                let mut store: $type2 =
275                    manager.create_collection("test", "test").unwrap();
276                Collection::put(&mut store, "key1", b"value1").unwrap();
277                Collection::put(&mut store, "key2", b"value2").unwrap();
278                Collection::put(&mut store, "key3", b"value3").unwrap();
279                let items: Vec<_> = store
280                    .iter(true)
281                    .unwrap()
282                    .collect::<Result<Vec<_>, _>>()
283                    .unwrap();
284                assert_eq!(
285                    items,
286                    vec![
287                        ("key3".to_string(), b"value3".to_vec()),
288                        ("key2".to_string(), b"value2".to_vec()),
289                        ("key1".to_string(), b"value1".to_vec()),
290                    ]
291                );
292                assert!(manager.stop().is_ok())
293            }
294
295            #[test]
296            fn test_last() {
297                let mut manager = <$type>::default();
298                let mut store: $type2 =
299                    manager.create_collection("test", "test").unwrap();
300                Collection::put(&mut store, "key1", b"value1").unwrap();
301                Collection::put(&mut store, "key2", b"value2").unwrap();
302                Collection::put(&mut store, "key3", b"value3").unwrap();
303                let last = store.last().unwrap();
304                assert_eq!(
305                    last,
306                    Some(("key3".to_string(), b"value3".to_vec()))
307                );
308                assert!(manager.stop().is_ok())
309            }
310
311            #[test]
312            fn test_get_by_range() {
313                let mut manager = <$type>::default();
314                let mut store: $type2 =
315                    manager.create_collection("test", "test").unwrap();
316                Collection::put(&mut store, "key1", b"value1").unwrap();
317                Collection::put(&mut store, "key2", b"value2").unwrap();
318                Collection::put(&mut store, "key3", b"value3").unwrap();
319                let result = store.get_by_range(None, 2).unwrap();
320                assert_eq!(
321                    result,
322                    vec![b"value1".to_vec(), b"value2".to_vec()]
323                );
324                let result =
325                    store.get_by_range(Some("key3".to_string()), -2).unwrap();
326                assert_eq!(
327                    result,
328                    vec![b"value2".to_vec(), b"value1".to_vec()]
329                );
330                assert!(manager.stop().is_ok())
331            }
332
333            #[test]
334            fn test_purge_collection() {
335                let mut manager = <$type>::default();
336                let mut store: $type2 =
337                    manager.create_collection("test", "test").unwrap();
338                Collection::put(&mut store, "key1", b"value1").unwrap();
339                Collection::put(&mut store, "key2", b"value2").unwrap();
340                Collection::put(&mut store, "key3", b"value3").unwrap();
341                assert_eq!(
342                    Collection::get(&store, "key1"),
343                    Ok(b"value1".to_vec())
344                );
345                assert_eq!(
346                    Collection::get(&store, "key2"),
347                    Ok(b"value2".to_vec())
348                );
349                assert_eq!(
350                    Collection::get(&store, "key3"),
351                    Ok(b"value3".to_vec())
352                );
353                Collection::purge(&mut store).unwrap();
354                assert_eq!(
355                    Collection::get(&store, "key1"),
356                    Err(Error::EntryNotFound {
357                        key: "test.key1".to_owned()
358                    })
359                );
360                assert_eq!(
361                    Collection::get(&store, "key2"),
362                    Err(Error::EntryNotFound {
363                        key: "test.key2".to_owned()
364                    })
365                );
366                assert_eq!(
367                    Collection::get(&store, "key3"),
368                    Err(Error::EntryNotFound {
369                        key: "test.key3".to_owned()
370                    })
371                );
372                assert!(manager.stop().is_ok())
373            }
374
375            #[test]
376            fn test_purge_state() {
377                let mut manager = <$type>::default();
378                let mut store: $type2 =
379                    manager.create_state("test", "test").unwrap();
380                State::put(&mut store, b"value1").unwrap();
381                assert_eq!(State::get(&store), Ok(b"value1".to_vec()));
382                State::purge(&mut store).unwrap();
383                assert_eq!(
384                    State::get(&store),
385                    Err(Error::EntryNotFound {
386                        key: "test".to_owned()
387                    })
388                );
389
390                State::put(&mut store, b"value2").unwrap();
391                assert_eq!(State::get(&store), Ok(b"value2".to_vec()));
392                State::purge(&mut store).unwrap();
393                assert_eq!(
394                    State::get(&store),
395                    Err(Error::EntryNotFound {
396                        key: "test".to_owned()
397                    })
398                );
399                assert!(manager.stop().is_ok())
400            }
401        }
402    };
403}