hashgraph_like_consensus/
storage.rs1use async_stream::try_stream;
8use futures::Stream;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::RwLock;
11
12use crate::{
13 error::ConsensusError, scope::ConsensusScope, scope_config::ScopeConfig,
14 session::ConsensusSession,
15};
16
17pub trait ConsensusStorage<Scope>: Clone + Send + Sync + 'static
23where
24 Scope: ConsensusScope,
25{
26 fn save_session(
28 &self,
29 scope: &Scope,
30 session: ConsensusSession,
31 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
32
33 fn get_session(
35 &self,
36 scope: &Scope,
37 proposal_id: u32,
38 ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
39
40 fn remove_session(
42 &self,
43 scope: &Scope,
44 proposal_id: u32,
45 ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
46
47 fn list_scope_sessions(
49 &self,
50 scope: &Scope,
51 ) -> impl Future<Output = Result<Option<Vec<ConsensusSession>>, ConsensusError>> + Send;
52
53 fn stream_scope_sessions<'a>(
55 &'a self,
56 scope: &'a Scope,
57 ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a;
58
59 fn replace_scope_sessions(
61 &self,
62 scope: &Scope,
63 sessions: Vec<ConsensusSession>,
64 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
65
66 fn list_scopes(
68 &self,
69 ) -> impl Future<Output = Result<Option<Vec<Scope>>, ConsensusError>> + Send;
70
71 fn update_session<R, F>(
73 &self,
74 scope: &Scope,
75 proposal_id: u32,
76 mutator: F,
77 ) -> impl Future<Output = Result<R, ConsensusError>> + Send
78 where
79 R: Send,
80 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send;
81
82 fn update_scope_sessions<F>(
84 &self,
85 scope: &Scope,
86 mutator: F,
87 ) -> impl Future<Output = Result<(), ConsensusError>> + Send
88 where
89 F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send;
90
91 fn get_scope_config(
93 &self,
94 scope: &Scope,
95 ) -> impl Future<Output = Result<Option<ScopeConfig>, ConsensusError>> + Send;
96
97 fn set_scope_config(
99 &self,
100 scope: &Scope,
101 config: ScopeConfig,
102 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
103
104 fn update_scope_config<F>(
106 &self,
107 scope: &Scope,
108 updater: F,
109 ) -> impl Future<Output = Result<(), ConsensusError>> + Send
110 where
111 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send;
112}
113
114#[derive(Clone)]
119pub struct InMemoryConsensusStorage<Scope>
120where
121 Scope: ConsensusScope,
122{
123 sessions: Arc<RwLock<HashMap<Scope, HashMap<u32, ConsensusSession>>>>,
124 scope_configs: Arc<RwLock<HashMap<Scope, ScopeConfig>>>,
125}
126
127impl<Scope> Default for InMemoryConsensusStorage<Scope>
128where
129 Scope: ConsensusScope,
130{
131 fn default() -> Self {
132 Self {
133 sessions: Arc::new(RwLock::new(HashMap::new())),
134 scope_configs: Arc::new(RwLock::new(HashMap::new())),
135 }
136 }
137}
138
139impl<Scope> InMemoryConsensusStorage<Scope>
140where
141 Scope: ConsensusScope,
142{
143 pub fn new() -> Self {
148 Self::default()
149 }
150}
151
152impl<Scope> ConsensusStorage<Scope> for InMemoryConsensusStorage<Scope>
153where
154 Scope: ConsensusScope + Clone,
155{
156 async fn save_session(
157 &self,
158 scope: &Scope,
159 session: ConsensusSession,
160 ) -> Result<(), ConsensusError> {
161 let mut sessions = self.sessions.write().await;
162 let entry = sessions.entry(scope.clone()).or_default();
163 entry.insert(session.proposal.proposal_id, session);
164 Ok(())
165 }
166
167 async fn get_session(
168 &self,
169 scope: &Scope,
170 proposal_id: u32,
171 ) -> Result<Option<ConsensusSession>, ConsensusError> {
172 let sessions = self.sessions.read().await;
173 Ok(sessions
174 .get(scope)
175 .and_then(|scope| scope.get(&proposal_id))
176 .cloned())
177 }
178
179 async fn remove_session(
180 &self,
181 scope: &Scope,
182 proposal_id: u32,
183 ) -> Result<Option<ConsensusSession>, ConsensusError> {
184 let mut sessions = self.sessions.write().await;
185 Ok(sessions
186 .get_mut(scope)
187 .and_then(|scope| scope.remove(&proposal_id)))
188 }
189
190 async fn list_scope_sessions(
191 &self,
192 scope: &Scope,
193 ) -> Result<Option<Vec<ConsensusSession>>, ConsensusError> {
194 let sessions = self.sessions.read().await;
195 let result = sessions
196 .get(scope)
197 .map(|scope| scope.values().cloned().collect::<Vec<ConsensusSession>>());
198 Ok(result)
199 }
200
201 fn stream_scope_sessions<'a>(
202 &'a self,
203 scope: &'a Scope,
204 ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a {
205 try_stream! {
206 let guard = self.sessions.read().await;
207
208 if let Some(inner_map) = guard.get(scope) {
209 for session in inner_map.values() {
210 yield session.clone();
211 }
212 }
213 }
214 }
215
216 async fn replace_scope_sessions(
217 &self,
218 scope: &Scope,
219 sessions_list: Vec<ConsensusSession>,
220 ) -> Result<(), ConsensusError> {
221 let mut sessions = self.sessions.write().await;
222 let new_map = sessions_list
223 .into_iter()
224 .map(|session| (session.proposal.proposal_id, session))
225 .collect();
226 sessions.insert(scope.clone(), new_map);
227 Ok(())
228 }
229
230 async fn list_scopes(&self) -> Result<Option<Vec<Scope>>, ConsensusError> {
231 let sessions = self.sessions.read().await;
232 let result = sessions.keys().cloned().collect::<Vec<Scope>>();
233 if result.is_empty() {
234 return Ok(None);
235 }
236 Ok(Some(result))
237 }
238
239 async fn update_session<R, F>(
240 &self,
241 scope: &Scope,
242 proposal_id: u32,
243 mutator: F,
244 ) -> Result<R, ConsensusError>
245 where
246 R: Send,
247 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
248 {
249 let mut sessions = self.sessions.write().await;
250 let session = sessions
251 .get_mut(scope)
252 .and_then(|scope_sessions| scope_sessions.get_mut(&proposal_id))
253 .ok_or(ConsensusError::SessionNotFound)?;
254
255 let result = mutator(session)?;
256 Ok(result)
257 }
258
259 async fn update_scope_sessions<F>(
260 &self,
261 scope: &Scope,
262 mutator: F,
263 ) -> Result<(), ConsensusError>
264 where
265 F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send,
266 {
267 let mut sessions = self.sessions.write().await;
268 let scope_sessions = sessions.entry(scope.clone()).or_default();
269
270 let mut sessions_vec: Vec<ConsensusSession> = scope_sessions.values().cloned().collect();
271 mutator(&mut sessions_vec)?;
272
273 if sessions_vec.is_empty() {
274 sessions.remove(scope);
275 return Ok(());
276 }
277
278 let new_map: HashMap<u32, ConsensusSession> = sessions_vec
279 .into_iter()
280 .map(|session| (session.proposal.proposal_id, session))
281 .collect();
282
283 *scope_sessions = new_map;
284 Ok(())
285 }
286
287 async fn get_scope_config(&self, scope: &Scope) -> Result<Option<ScopeConfig>, ConsensusError> {
288 let configs = self.scope_configs.read().await;
289 Ok(configs.get(scope).cloned())
290 }
291
292 async fn set_scope_config(
293 &self,
294 scope: &Scope,
295 config: ScopeConfig,
296 ) -> Result<(), ConsensusError> {
297 config.validate()?;
298 let mut configs = self.scope_configs.write().await;
299 configs.insert(scope.clone(), config);
300 Ok(())
301 }
302
303 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
304 where
305 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
306 {
307 let mut configs = self.scope_configs.write().await;
308 let config = configs.entry(scope.clone()).or_default();
309 updater(config)?;
310 config.validate()?;
311 Ok(())
312 }
313}