ankurah_core/
storage.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tracing::warn;
5
6use crate::error::{MutationError, RetrievalError};
7use ankurah_proto::{Attested, CollectionId, EntityId, EntityState, Event, EventId};
8
9pub fn state_name(name: &str) -> String { format!("{}_state", name) }
10
11pub fn event_name(name: &str) -> String { format!("{}_event", name) }
12
13#[async_trait]
14pub trait StorageEngine: Send + Sync {
15    type Value;
16    // Opens and/or creates a storage collection.
17    async fn collection(&self, id: &CollectionId) -> Result<Arc<dyn StorageCollection>, RetrievalError>;
18    // Delete all collections and their data from the storage engine
19    async fn delete_all_collections(&self) -> Result<bool, MutationError>;
20}
21
22#[async_trait]
23pub trait StorageCollection: Send + Sync {
24    async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError>;
25    async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError>;
26
27    // Fetch raw entity states matching a selection (predicate + order by + limit)
28    async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError>;
29
30    async fn set_states(&self, states: Vec<Attested<EntityState>>) -> Result<(), MutationError> {
31        for state in states {
32            self.set_state(state).await?;
33        }
34        Ok(())
35    }
36
37    async fn get_states(&self, ids: Vec<EntityId>) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
38        let mut states = Vec::new();
39        for id in ids {
40            match self.get_state(id).await {
41                Ok(state) => states.push(state),
42                Err(RetrievalError::EntityNotFound(_)) => {
43                    warn!("Entity not found: {:?}", id);
44                }
45                Err(e) => return Err(e),
46            }
47        }
48        Ok(states)
49    }
50
51    async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError>;
52
53    /// Retrieve a list of events
54    async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError>;
55
56    /// Retrieve all events from the collection
57    async fn dump_entity_events(&self, id: EntityId) -> Result<Vec<Attested<Event>>, RetrievalError>;
58}
59
60/// Manages the storage and state of the collection without any knowledge of the model type
61#[derive(Clone)]
62pub struct StorageCollectionWrapper(pub(crate) Arc<dyn StorageCollection>);
63
64/// Storage interface for a collection
65impl StorageCollectionWrapper {
66    pub fn new(bucket: Arc<dyn StorageCollection>) -> Self { Self(bucket) }
67}
68
69impl std::ops::Deref for StorageCollectionWrapper {
70    type Target = Arc<dyn StorageCollection>;
71    fn deref(&self) -> &Self::Target { &self.0 }
72}