Skip to main content

hashgraph_like_consensus/
storage.rs

1//! Storage trait and default in-memory implementation.
2//!
3//! Implement [`ConsensusStorage`] to persist consensus sessions to a database or
4//! other durable backend. The provided [`InMemoryConsensusStorage`] keeps everything
5//! in RAM and is suitable for testing or single-node deployments.
6
7use async_stream::try_stream;
8use futures::Stream;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::RwLock;
11
12use crate::{
13    error::ConsensusError, scope::ConsensusScope, scope_config::ScopeConfig,
14    session::ConsensusSession,
15};
16
17/// Trait for storing and retrieving consensus sessions.
18///
19/// Implement this to use your own storage backend (database, file system, etc.).
20/// The default `InMemoryConsensusStorage` stores everything in RAM, which is fine
21/// for testing or single-node setups but won't persist across restarts.
22pub trait ConsensusStorage<Scope>: Clone + Send + Sync + 'static
23where
24    Scope: ConsensusScope,
25{
26    /// Persist a session (insert or overwrite by `proposal_id`).
27    fn save_session(
28        &self,
29        scope: &Scope,
30        session: ConsensusSession,
31    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
32
33    /// Retrieve a session by proposal ID, or `None` if it doesn't exist.
34    fn get_session(
35        &self,
36        scope: &Scope,
37        proposal_id: u32,
38    ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
39
40    /// Remove and return a session, or `None` if not found.
41    fn remove_session(
42        &self,
43        scope: &Scope,
44        proposal_id: u32,
45    ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
46
47    /// List all sessions in a scope, or `None` if the scope doesn't exist.
48    fn list_scope_sessions(
49        &self,
50        scope: &Scope,
51    ) -> impl Future<Output = Result<Option<Vec<ConsensusSession>>, ConsensusError>> + Send;
52
53    /// Stream sessions in a scope one at a time (useful for large scopes).
54    fn stream_scope_sessions<'a>(
55        &'a self,
56        scope: &'a Scope,
57    ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a;
58
59    /// Replace all sessions in a scope atomically.
60    fn replace_scope_sessions(
61        &self,
62        scope: &Scope,
63        sessions: Vec<ConsensusSession>,
64    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
65
66    /// List all known scopes, or `None` if no scopes exist.
67    fn list_scopes(
68        &self,
69    ) -> impl Future<Output = Result<Option<Vec<Scope>>, ConsensusError>> + Send;
70
71    /// Apply a mutation to a single session in place.
72    fn update_session<R, F>(
73        &self,
74        scope: &Scope,
75        proposal_id: u32,
76        mutator: F,
77    ) -> impl Future<Output = Result<R, ConsensusError>> + Send
78    where
79        R: Send,
80        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send;
81
82    /// Apply a mutation to all sessions in a scope (e.g. trimming old entries).
83    fn update_scope_sessions<F>(
84        &self,
85        scope: &Scope,
86        mutator: F,
87    ) -> impl Future<Output = Result<(), ConsensusError>> + Send
88    where
89        F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send;
90
91    /// Get the scope-level configuration, or `None` if not yet initialized.
92    fn get_scope_config(
93        &self,
94        scope: &Scope,
95    ) -> impl Future<Output = Result<Option<ScopeConfig>, ConsensusError>> + Send;
96
97    /// Set (insert or overwrite) the scope-level configuration.
98    fn set_scope_config(
99        &self,
100        scope: &Scope,
101        config: ScopeConfig,
102    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
103
104    /// Apply a mutation to an existing scope configuration.
105    fn update_scope_config<F>(
106        &self,
107        scope: &Scope,
108        updater: F,
109    ) -> impl Future<Output = Result<(), ConsensusError>> + Send
110    where
111        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send;
112}
113
114/// In-memory storage for consensus sessions.
115///
116/// Stores all sessions in RAM using a hash map. This is the default storage implementation
117/// and works well for testing or single-node setups. Data is lost when the process exits.
118#[derive(Clone)]
119pub struct InMemoryConsensusStorage<Scope>
120where
121    Scope: ConsensusScope,
122{
123    sessions: Arc<RwLock<HashMap<Scope, HashMap<u32, ConsensusSession>>>>,
124    scope_configs: Arc<RwLock<HashMap<Scope, ScopeConfig>>>,
125}
126
127impl<Scope> Default for InMemoryConsensusStorage<Scope>
128where
129    Scope: ConsensusScope,
130{
131    fn default() -> Self {
132        Self {
133            sessions: Arc::new(RwLock::new(HashMap::new())),
134            scope_configs: Arc::new(RwLock::new(HashMap::new())),
135        }
136    }
137}
138
139impl<Scope> InMemoryConsensusStorage<Scope>
140where
141    Scope: ConsensusScope,
142{
143    /// Create a new in-memory storage instance.
144    ///
145    /// This stores all consensus sessions in RAM. Perfect for testing or single-node setups,
146    /// but data won't persist across restarts.
147    pub fn new() -> Self {
148        Self::default()
149    }
150}
151
152impl<Scope> ConsensusStorage<Scope> for InMemoryConsensusStorage<Scope>
153where
154    Scope: ConsensusScope + Clone,
155{
156    async fn save_session(
157        &self,
158        scope: &Scope,
159        session: ConsensusSession,
160    ) -> Result<(), ConsensusError> {
161        let mut sessions = self.sessions.write().await;
162        let entry = sessions.entry(scope.clone()).or_default();
163        entry.insert(session.proposal.proposal_id, session);
164        Ok(())
165    }
166
167    async fn get_session(
168        &self,
169        scope: &Scope,
170        proposal_id: u32,
171    ) -> Result<Option<ConsensusSession>, ConsensusError> {
172        let sessions = self.sessions.read().await;
173        Ok(sessions
174            .get(scope)
175            .and_then(|scope| scope.get(&proposal_id))
176            .cloned())
177    }
178
179    async fn remove_session(
180        &self,
181        scope: &Scope,
182        proposal_id: u32,
183    ) -> Result<Option<ConsensusSession>, ConsensusError> {
184        let mut sessions = self.sessions.write().await;
185        Ok(sessions
186            .get_mut(scope)
187            .and_then(|scope| scope.remove(&proposal_id)))
188    }
189
190    async fn list_scope_sessions(
191        &self,
192        scope: &Scope,
193    ) -> Result<Option<Vec<ConsensusSession>>, ConsensusError> {
194        let sessions = self.sessions.read().await;
195        let result = sessions
196            .get(scope)
197            .map(|scope| scope.values().cloned().collect::<Vec<ConsensusSession>>());
198        Ok(result)
199    }
200
201    fn stream_scope_sessions<'a>(
202        &'a self,
203        scope: &'a Scope,
204    ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a {
205        try_stream! {
206            let guard = self.sessions.read().await;
207
208            if let Some(inner_map) = guard.get(scope) {
209                for session in inner_map.values() {
210                    yield session.clone();
211                }
212            }
213        }
214    }
215
216    async fn replace_scope_sessions(
217        &self,
218        scope: &Scope,
219        sessions_list: Vec<ConsensusSession>,
220    ) -> Result<(), ConsensusError> {
221        let mut sessions = self.sessions.write().await;
222        let new_map = sessions_list
223            .into_iter()
224            .map(|session| (session.proposal.proposal_id, session))
225            .collect();
226        sessions.insert(scope.clone(), new_map);
227        Ok(())
228    }
229
230    async fn list_scopes(&self) -> Result<Option<Vec<Scope>>, ConsensusError> {
231        let sessions = self.sessions.read().await;
232        let result = sessions.keys().cloned().collect::<Vec<Scope>>();
233        if result.is_empty() {
234            return Ok(None);
235        }
236        Ok(Some(result))
237    }
238
239    async fn update_session<R, F>(
240        &self,
241        scope: &Scope,
242        proposal_id: u32,
243        mutator: F,
244    ) -> Result<R, ConsensusError>
245    where
246        R: Send,
247        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
248    {
249        let mut sessions = self.sessions.write().await;
250        let session = sessions
251            .get_mut(scope)
252            .and_then(|scope_sessions| scope_sessions.get_mut(&proposal_id))
253            .ok_or(ConsensusError::SessionNotFound)?;
254
255        let result = mutator(session)?;
256        Ok(result)
257    }
258
259    async fn update_scope_sessions<F>(
260        &self,
261        scope: &Scope,
262        mutator: F,
263    ) -> Result<(), ConsensusError>
264    where
265        F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send,
266    {
267        let mut sessions = self.sessions.write().await;
268        let scope_sessions = sessions.entry(scope.clone()).or_default();
269
270        let mut sessions_vec: Vec<ConsensusSession> = scope_sessions.values().cloned().collect();
271        mutator(&mut sessions_vec)?;
272
273        if sessions_vec.is_empty() {
274            sessions.remove(scope);
275            return Ok(());
276        }
277
278        let new_map: HashMap<u32, ConsensusSession> = sessions_vec
279            .into_iter()
280            .map(|session| (session.proposal.proposal_id, session))
281            .collect();
282
283        *scope_sessions = new_map;
284        Ok(())
285    }
286
287    async fn get_scope_config(&self, scope: &Scope) -> Result<Option<ScopeConfig>, ConsensusError> {
288        let configs = self.scope_configs.read().await;
289        Ok(configs.get(scope).cloned())
290    }
291
292    async fn set_scope_config(
293        &self,
294        scope: &Scope,
295        config: ScopeConfig,
296    ) -> Result<(), ConsensusError> {
297        config.validate()?;
298        let mut configs = self.scope_configs.write().await;
299        configs.insert(scope.clone(), config);
300        Ok(())
301    }
302
303    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
304    where
305        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
306    {
307        let mut configs = self.scope_configs.write().await;
308        let config = configs.entry(scope.clone()).or_default();
309        updater(config)?;
310        config.validate()?;
311        Ok(())
312    }
313}