Skip to main content

hashgraph_like_consensus/
storage.rs

1//! Storage trait and default in-memory implementation.
2//!
3//! Implement [`ConsensusStorage`] to persist consensus sessions to a database or
4//! other durable backend. The provided [`InMemoryConsensusStorage`] keeps everything
5//! in RAM and is suitable for testing or single-node deployments.
6
7use async_stream::try_stream;
8use futures::Stream;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::RwLock;
11
12use crate::{
13    error::ConsensusError,
14    protos::consensus::v1::Proposal,
15    scope::ConsensusScope,
16    scope_config::ScopeConfig,
17    session::{ConsensusConfig, ConsensusSession},
18};
19
20/// Trait for storing and retrieving consensus sessions.
21///
22/// Implement this to use your own storage backend (database, file system, etc.).
23/// The default `InMemoryConsensusStorage` stores everything in RAM, which is fine
24/// for testing or single-node setups but won't persist across restarts.
25pub trait ConsensusStorage<Scope>: Clone + Send + Sync + 'static
26where
27    Scope: ConsensusScope,
28{
29    /// Persist a session (insert or overwrite by `proposal_id`).
30    fn save_session(
31        &self,
32        scope: &Scope,
33        session: ConsensusSession,
34    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
35
36    /// Retrieve a session by proposal ID, or `None` if it doesn't exist.
37    fn get_session(
38        &self,
39        scope: &Scope,
40        proposal_id: u32,
41    ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
42
43    /// Remove and return a session, or `None` if not found.
44    fn remove_session(
45        &self,
46        scope: &Scope,
47        proposal_id: u32,
48    ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
49
50    /// List all sessions in a scope, or `None` if the scope doesn't exist.
51    fn list_scope_sessions(
52        &self,
53        scope: &Scope,
54    ) -> impl Future<Output = Result<Option<Vec<ConsensusSession>>, ConsensusError>> + Send;
55
56    /// Stream sessions in a scope one at a time (useful for large scopes).
57    fn stream_scope_sessions<'a>(
58        &'a self,
59        scope: &'a Scope,
60    ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a;
61
62    /// Replace all sessions in a scope atomically.
63    fn replace_scope_sessions(
64        &self,
65        scope: &Scope,
66        sessions: Vec<ConsensusSession>,
67    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
68
69    /// List all known scopes, or `None` if no scopes exist.
70    fn list_scopes(
71        &self,
72    ) -> impl Future<Output = Result<Option<Vec<Scope>>, ConsensusError>> + Send;
73
74    /// Apply a mutation to a single session in place.
75    fn update_session<R, F>(
76        &self,
77        scope: &Scope,
78        proposal_id: u32,
79        mutator: F,
80    ) -> impl Future<Output = Result<R, ConsensusError>> + Send
81    where
82        R: Send,
83        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send;
84
85    /// Apply a mutation to all sessions in a scope (e.g. trimming old entries).
86    fn update_scope_sessions<F>(
87        &self,
88        scope: &Scope,
89        mutator: F,
90    ) -> impl Future<Output = Result<(), ConsensusError>> + Send
91    where
92        F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send;
93
94    /// Get the scope-level configuration, or `None` if not yet initialized.
95    fn get_scope_config(
96        &self,
97        scope: &Scope,
98    ) -> impl Future<Output = Result<Option<ScopeConfig>, ConsensusError>> + Send;
99
100    /// Set (insert or overwrite) the scope-level configuration.
101    fn set_scope_config(
102        &self,
103        scope: &Scope,
104        config: ScopeConfig,
105    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
106
107    /// Remove all data for a scope (sessions, config, everything).
108    ///
109    /// Called when a group is left or deleted. After this call, the scope
110    /// behaves as if it was never initialized — creating new proposals on it
111    /// starts fresh.
112    fn delete_scope(
113        &self,
114        scope: &Scope,
115    ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
116
117    /// Apply a mutation to an existing scope configuration.
118    fn update_scope_config<F>(
119        &self,
120        scope: &Scope,
121        updater: F,
122    ) -> impl Future<Output = Result<(), ConsensusError>> + Send
123    where
124        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send;
125
126    // ── Query helpers (default implementations) ────────────────────────
127    //
128    // These are derived from the primitives above. Storage implementors
129    // get them for free — override only if your backend can do it faster.
130
131    /// Get the consensus result for a proposal.
132    ///
133    /// Returns `Ok(true)` for YES, `Ok(false)` for NO.
134    /// Returns [`SessionNotFound`](ConsensusError::SessionNotFound) if the
135    /// proposal doesn't exist,
136    /// [`ConsensusFailed`](ConsensusError::ConsensusFailed) if the session
137    /// failed, or [`ConsensusNotReached`](ConsensusError::ConsensusNotReached)
138    /// if voting is still active.
139    fn get_consensus_result(
140        &self,
141        scope: &Scope,
142        proposal_id: u32,
143    ) -> impl Future<Output = Result<bool, ConsensusError>> + Send {
144        async move {
145            use crate::session::ConsensusState;
146            let session = self
147                .get_session(scope, proposal_id)
148                .await?
149                .ok_or(ConsensusError::SessionNotFound)?;
150            match session.state {
151                ConsensusState::ConsensusReached(result) => Ok(result),
152                ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
153                ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
154            }
155        }
156    }
157
158    /// Get a proposal by ID.
159    ///
160    /// Returns [`SessionNotFound`](ConsensusError::SessionNotFound) if the
161    /// proposal doesn't exist.
162    fn get_proposal(
163        &self,
164        scope: &Scope,
165        proposal_id: u32,
166    ) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send {
167        async move {
168            let session = self
169                .get_session(scope, proposal_id)
170                .await?
171                .ok_or(ConsensusError::SessionNotFound)?;
172            Ok(session.proposal)
173        }
174    }
175
176    /// Get the resolved configuration for a proposal.
177    ///
178    /// Returns [`SessionNotFound`](ConsensusError::SessionNotFound) if the
179    /// proposal doesn't exist.
180    fn get_proposal_config(
181        &self,
182        scope: &Scope,
183        proposal_id: u32,
184    ) -> impl Future<Output = Result<ConsensusConfig, ConsensusError>> + Send {
185        async move {
186            let session = self
187                .get_session(scope, proposal_id)
188                .await?
189                .ok_or(ConsensusError::SessionNotFound)?;
190            Ok(session.config)
191        }
192    }
193
194    /// Get all proposals that are still accepting votes.
195    ///
196    /// Returns an empty `Vec` if no active proposals exist or the scope is unknown.
197    fn get_active_proposals(
198        &self,
199        scope: &Scope,
200    ) -> impl Future<Output = Result<Vec<Proposal>, ConsensusError>> + Send {
201        async move {
202            let sessions = self.list_scope_sessions(scope).await?.unwrap_or_default();
203            Ok(sessions
204                .into_iter()
205                .filter(|s| s.is_active())
206                .map(|s| s.proposal)
207                .collect())
208        }
209    }
210
211    /// Get all proposals that reached consensus, with their results.
212    ///
213    /// Returns a map from `proposal_id` to result (`true` = YES, `false` = NO).
214    /// Returns an empty map if no proposals reached consensus or the scope is unknown.
215    fn get_reached_proposals(
216        &self,
217        scope: &Scope,
218    ) -> impl Future<Output = Result<HashMap<u32, bool>, ConsensusError>> + Send {
219        async move {
220            let sessions = self.list_scope_sessions(scope).await?.unwrap_or_default();
221            Ok(sessions
222                .into_iter()
223                .filter_map(|s| {
224                    s.get_consensus_result()
225                        .ok()
226                        .map(|result| (s.proposal.proposal_id, result))
227                })
228                .collect())
229        }
230    }
231}
232
233/// In-memory storage for consensus sessions.
234///
235/// Stores all sessions in RAM using a hash map. This is the default storage implementation
236/// and works well for testing or single-node setups. Data is lost when the process exits.
237#[derive(Clone)]
238pub struct InMemoryConsensusStorage<Scope>
239where
240    Scope: ConsensusScope,
241{
242    sessions: Arc<RwLock<HashMap<Scope, HashMap<u32, ConsensusSession>>>>,
243    scope_configs: Arc<RwLock<HashMap<Scope, ScopeConfig>>>,
244}
245
246impl<Scope> Default for InMemoryConsensusStorage<Scope>
247where
248    Scope: ConsensusScope,
249{
250    fn default() -> Self {
251        Self {
252            sessions: Arc::new(RwLock::new(HashMap::new())),
253            scope_configs: Arc::new(RwLock::new(HashMap::new())),
254        }
255    }
256}
257
258impl<Scope> InMemoryConsensusStorage<Scope>
259where
260    Scope: ConsensusScope,
261{
262    /// Create a new in-memory storage instance.
263    ///
264    /// This stores all consensus sessions in RAM. Perfect for testing or single-node setups,
265    /// but data won't persist across restarts.
266    pub fn new() -> Self {
267        Self::default()
268    }
269}
270
271impl<Scope> ConsensusStorage<Scope> for InMemoryConsensusStorage<Scope>
272where
273    Scope: ConsensusScope + Clone,
274{
275    async fn save_session(
276        &self,
277        scope: &Scope,
278        session: ConsensusSession,
279    ) -> Result<(), ConsensusError> {
280        let mut sessions = self.sessions.write().await;
281        let entry = sessions.entry(scope.clone()).or_default();
282        entry.insert(session.proposal.proposal_id, session);
283        Ok(())
284    }
285
286    async fn get_session(
287        &self,
288        scope: &Scope,
289        proposal_id: u32,
290    ) -> Result<Option<ConsensusSession>, ConsensusError> {
291        let sessions = self.sessions.read().await;
292        Ok(sessions
293            .get(scope)
294            .and_then(|scope| scope.get(&proposal_id))
295            .cloned())
296    }
297
298    async fn remove_session(
299        &self,
300        scope: &Scope,
301        proposal_id: u32,
302    ) -> Result<Option<ConsensusSession>, ConsensusError> {
303        let mut sessions = self.sessions.write().await;
304        Ok(sessions
305            .get_mut(scope)
306            .and_then(|scope| scope.remove(&proposal_id)))
307    }
308
309    async fn list_scope_sessions(
310        &self,
311        scope: &Scope,
312    ) -> Result<Option<Vec<ConsensusSession>>, ConsensusError> {
313        let sessions = self.sessions.read().await;
314        let result = sessions
315            .get(scope)
316            .map(|scope| scope.values().cloned().collect::<Vec<ConsensusSession>>());
317        Ok(result)
318    }
319
320    fn stream_scope_sessions<'a>(
321        &'a self,
322        scope: &'a Scope,
323    ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a {
324        try_stream! {
325            let guard = self.sessions.read().await;
326
327            if let Some(inner_map) = guard.get(scope) {
328                for session in inner_map.values() {
329                    yield session.clone();
330                }
331            }
332        }
333    }
334
335    async fn replace_scope_sessions(
336        &self,
337        scope: &Scope,
338        sessions_list: Vec<ConsensusSession>,
339    ) -> Result<(), ConsensusError> {
340        let mut sessions = self.sessions.write().await;
341        let new_map = sessions_list
342            .into_iter()
343            .map(|session| (session.proposal.proposal_id, session))
344            .collect();
345        sessions.insert(scope.clone(), new_map);
346        Ok(())
347    }
348
349    async fn list_scopes(&self) -> Result<Option<Vec<Scope>>, ConsensusError> {
350        let sessions = self.sessions.read().await;
351        let result = sessions.keys().cloned().collect::<Vec<Scope>>();
352        if result.is_empty() {
353            return Ok(None);
354        }
355        Ok(Some(result))
356    }
357
358    async fn update_session<R, F>(
359        &self,
360        scope: &Scope,
361        proposal_id: u32,
362        mutator: F,
363    ) -> Result<R, ConsensusError>
364    where
365        R: Send,
366        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
367    {
368        let mut sessions = self.sessions.write().await;
369        let session = sessions
370            .get_mut(scope)
371            .and_then(|scope_sessions| scope_sessions.get_mut(&proposal_id))
372            .ok_or(ConsensusError::SessionNotFound)?;
373
374        let result = mutator(session)?;
375        Ok(result)
376    }
377
378    async fn update_scope_sessions<F>(
379        &self,
380        scope: &Scope,
381        mutator: F,
382    ) -> Result<(), ConsensusError>
383    where
384        F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send,
385    {
386        let mut sessions = self.sessions.write().await;
387        let scope_sessions = sessions.entry(scope.clone()).or_default();
388
389        let mut sessions_vec: Vec<ConsensusSession> = scope_sessions.values().cloned().collect();
390        mutator(&mut sessions_vec)?;
391
392        if sessions_vec.is_empty() {
393            sessions.remove(scope);
394            return Ok(());
395        }
396
397        let new_map: HashMap<u32, ConsensusSession> = sessions_vec
398            .into_iter()
399            .map(|session| (session.proposal.proposal_id, session))
400            .collect();
401
402        *scope_sessions = new_map;
403        Ok(())
404    }
405
406    async fn get_scope_config(&self, scope: &Scope) -> Result<Option<ScopeConfig>, ConsensusError> {
407        let configs = self.scope_configs.read().await;
408        Ok(configs.get(scope).cloned())
409    }
410
411    async fn set_scope_config(
412        &self,
413        scope: &Scope,
414        config: ScopeConfig,
415    ) -> Result<(), ConsensusError> {
416        config.validate()?;
417        let mut configs = self.scope_configs.write().await;
418        configs.insert(scope.clone(), config);
419        Ok(())
420    }
421
422    async fn delete_scope(&self, scope: &Scope) -> Result<(), ConsensusError> {
423        let mut sessions = self.sessions.write().await;
424        sessions.remove(scope);
425        drop(sessions);
426
427        let mut configs = self.scope_configs.write().await;
428        configs.remove(scope);
429        Ok(())
430    }
431
432    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
433    where
434        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
435    {
436        let mut configs = self.scope_configs.write().await;
437        let config = configs.entry(scope.clone()).or_default();
438        updater(config)?;
439        config.validate()?;
440        Ok(())
441    }
442}