hashgraph_like_consensus/
storage.rs1use async_stream::try_stream;
8use futures::Stream;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::RwLock;
11
12use crate::{
13 error::ConsensusError,
14 protos::consensus::v1::Proposal,
15 scope::ConsensusScope,
16 scope_config::ScopeConfig,
17 session::{ConsensusConfig, ConsensusSession},
18};
19
20pub trait ConsensusStorage<Scope>: Clone + Send + Sync + 'static
26where
27 Scope: ConsensusScope,
28{
29 fn save_session(
31 &self,
32 scope: &Scope,
33 session: ConsensusSession,
34 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
35
36 fn get_session(
38 &self,
39 scope: &Scope,
40 proposal_id: u32,
41 ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
42
43 fn remove_session(
45 &self,
46 scope: &Scope,
47 proposal_id: u32,
48 ) -> impl Future<Output = Result<Option<ConsensusSession>, ConsensusError>> + Send;
49
50 fn list_scope_sessions(
52 &self,
53 scope: &Scope,
54 ) -> impl Future<Output = Result<Option<Vec<ConsensusSession>>, ConsensusError>> + Send;
55
56 fn stream_scope_sessions<'a>(
58 &'a self,
59 scope: &'a Scope,
60 ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a;
61
62 fn replace_scope_sessions(
64 &self,
65 scope: &Scope,
66 sessions: Vec<ConsensusSession>,
67 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
68
69 fn list_scopes(
71 &self,
72 ) -> impl Future<Output = Result<Option<Vec<Scope>>, ConsensusError>> + Send;
73
74 fn update_session<R, F>(
76 &self,
77 scope: &Scope,
78 proposal_id: u32,
79 mutator: F,
80 ) -> impl Future<Output = Result<R, ConsensusError>> + Send
81 where
82 R: Send,
83 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send;
84
85 fn update_scope_sessions<F>(
87 &self,
88 scope: &Scope,
89 mutator: F,
90 ) -> impl Future<Output = Result<(), ConsensusError>> + Send
91 where
92 F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send;
93
94 fn get_scope_config(
96 &self,
97 scope: &Scope,
98 ) -> impl Future<Output = Result<Option<ScopeConfig>, ConsensusError>> + Send;
99
100 fn set_scope_config(
102 &self,
103 scope: &Scope,
104 config: ScopeConfig,
105 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
106
107 fn delete_scope(
113 &self,
114 scope: &Scope,
115 ) -> impl Future<Output = Result<(), ConsensusError>> + Send;
116
117 fn update_scope_config<F>(
119 &self,
120 scope: &Scope,
121 updater: F,
122 ) -> impl Future<Output = Result<(), ConsensusError>> + Send
123 where
124 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send;
125
126 fn get_consensus_result(
140 &self,
141 scope: &Scope,
142 proposal_id: u32,
143 ) -> impl Future<Output = Result<bool, ConsensusError>> + Send {
144 async move {
145 use crate::session::ConsensusState;
146 let session = self
147 .get_session(scope, proposal_id)
148 .await?
149 .ok_or(ConsensusError::SessionNotFound)?;
150 match session.state {
151 ConsensusState::ConsensusReached(result) => Ok(result),
152 ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
153 ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
154 }
155 }
156 }
157
158 fn get_proposal(
163 &self,
164 scope: &Scope,
165 proposal_id: u32,
166 ) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send {
167 async move {
168 let session = self
169 .get_session(scope, proposal_id)
170 .await?
171 .ok_or(ConsensusError::SessionNotFound)?;
172 Ok(session.proposal)
173 }
174 }
175
176 fn get_proposal_config(
181 &self,
182 scope: &Scope,
183 proposal_id: u32,
184 ) -> impl Future<Output = Result<ConsensusConfig, ConsensusError>> + Send {
185 async move {
186 let session = self
187 .get_session(scope, proposal_id)
188 .await?
189 .ok_or(ConsensusError::SessionNotFound)?;
190 Ok(session.config)
191 }
192 }
193
194 fn get_active_proposals(
198 &self,
199 scope: &Scope,
200 ) -> impl Future<Output = Result<Vec<Proposal>, ConsensusError>> + Send {
201 async move {
202 let sessions = self.list_scope_sessions(scope).await?.unwrap_or_default();
203 Ok(sessions
204 .into_iter()
205 .filter(|s| s.is_active())
206 .map(|s| s.proposal)
207 .collect())
208 }
209 }
210
211 fn get_reached_proposals(
216 &self,
217 scope: &Scope,
218 ) -> impl Future<Output = Result<HashMap<u32, bool>, ConsensusError>> + Send {
219 async move {
220 let sessions = self.list_scope_sessions(scope).await?.unwrap_or_default();
221 Ok(sessions
222 .into_iter()
223 .filter_map(|s| {
224 s.get_consensus_result()
225 .ok()
226 .map(|result| (s.proposal.proposal_id, result))
227 })
228 .collect())
229 }
230 }
231}
232
233#[derive(Clone)]
238pub struct InMemoryConsensusStorage<Scope>
239where
240 Scope: ConsensusScope,
241{
242 sessions: Arc<RwLock<HashMap<Scope, HashMap<u32, ConsensusSession>>>>,
243 scope_configs: Arc<RwLock<HashMap<Scope, ScopeConfig>>>,
244}
245
246impl<Scope> Default for InMemoryConsensusStorage<Scope>
247where
248 Scope: ConsensusScope,
249{
250 fn default() -> Self {
251 Self {
252 sessions: Arc::new(RwLock::new(HashMap::new())),
253 scope_configs: Arc::new(RwLock::new(HashMap::new())),
254 }
255 }
256}
257
258impl<Scope> InMemoryConsensusStorage<Scope>
259where
260 Scope: ConsensusScope,
261{
262 pub fn new() -> Self {
267 Self::default()
268 }
269}
270
271impl<Scope> ConsensusStorage<Scope> for InMemoryConsensusStorage<Scope>
272where
273 Scope: ConsensusScope + Clone,
274{
275 async fn save_session(
276 &self,
277 scope: &Scope,
278 session: ConsensusSession,
279 ) -> Result<(), ConsensusError> {
280 let mut sessions = self.sessions.write().await;
281 let entry = sessions.entry(scope.clone()).or_default();
282 entry.insert(session.proposal.proposal_id, session);
283 Ok(())
284 }
285
286 async fn get_session(
287 &self,
288 scope: &Scope,
289 proposal_id: u32,
290 ) -> Result<Option<ConsensusSession>, ConsensusError> {
291 let sessions = self.sessions.read().await;
292 Ok(sessions
293 .get(scope)
294 .and_then(|scope| scope.get(&proposal_id))
295 .cloned())
296 }
297
298 async fn remove_session(
299 &self,
300 scope: &Scope,
301 proposal_id: u32,
302 ) -> Result<Option<ConsensusSession>, ConsensusError> {
303 let mut sessions = self.sessions.write().await;
304 Ok(sessions
305 .get_mut(scope)
306 .and_then(|scope| scope.remove(&proposal_id)))
307 }
308
309 async fn list_scope_sessions(
310 &self,
311 scope: &Scope,
312 ) -> Result<Option<Vec<ConsensusSession>>, ConsensusError> {
313 let sessions = self.sessions.read().await;
314 let result = sessions
315 .get(scope)
316 .map(|scope| scope.values().cloned().collect::<Vec<ConsensusSession>>());
317 Ok(result)
318 }
319
320 fn stream_scope_sessions<'a>(
321 &'a self,
322 scope: &'a Scope,
323 ) -> impl Stream<Item = Result<ConsensusSession, ConsensusError>> + Send + 'a {
324 try_stream! {
325 let guard = self.sessions.read().await;
326
327 if let Some(inner_map) = guard.get(scope) {
328 for session in inner_map.values() {
329 yield session.clone();
330 }
331 }
332 }
333 }
334
335 async fn replace_scope_sessions(
336 &self,
337 scope: &Scope,
338 sessions_list: Vec<ConsensusSession>,
339 ) -> Result<(), ConsensusError> {
340 let mut sessions = self.sessions.write().await;
341 let new_map = sessions_list
342 .into_iter()
343 .map(|session| (session.proposal.proposal_id, session))
344 .collect();
345 sessions.insert(scope.clone(), new_map);
346 Ok(())
347 }
348
349 async fn list_scopes(&self) -> Result<Option<Vec<Scope>>, ConsensusError> {
350 let sessions = self.sessions.read().await;
351 let result = sessions.keys().cloned().collect::<Vec<Scope>>();
352 if result.is_empty() {
353 return Ok(None);
354 }
355 Ok(Some(result))
356 }
357
358 async fn update_session<R, F>(
359 &self,
360 scope: &Scope,
361 proposal_id: u32,
362 mutator: F,
363 ) -> Result<R, ConsensusError>
364 where
365 R: Send,
366 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
367 {
368 let mut sessions = self.sessions.write().await;
369 let session = sessions
370 .get_mut(scope)
371 .and_then(|scope_sessions| scope_sessions.get_mut(&proposal_id))
372 .ok_or(ConsensusError::SessionNotFound)?;
373
374 let result = mutator(session)?;
375 Ok(result)
376 }
377
378 async fn update_scope_sessions<F>(
379 &self,
380 scope: &Scope,
381 mutator: F,
382 ) -> Result<(), ConsensusError>
383 where
384 F: FnOnce(&mut Vec<ConsensusSession>) -> Result<(), ConsensusError> + Send,
385 {
386 let mut sessions = self.sessions.write().await;
387 let scope_sessions = sessions.entry(scope.clone()).or_default();
388
389 let mut sessions_vec: Vec<ConsensusSession> = scope_sessions.values().cloned().collect();
390 mutator(&mut sessions_vec)?;
391
392 if sessions_vec.is_empty() {
393 sessions.remove(scope);
394 return Ok(());
395 }
396
397 let new_map: HashMap<u32, ConsensusSession> = sessions_vec
398 .into_iter()
399 .map(|session| (session.proposal.proposal_id, session))
400 .collect();
401
402 *scope_sessions = new_map;
403 Ok(())
404 }
405
406 async fn get_scope_config(&self, scope: &Scope) -> Result<Option<ScopeConfig>, ConsensusError> {
407 let configs = self.scope_configs.read().await;
408 Ok(configs.get(scope).cloned())
409 }
410
411 async fn set_scope_config(
412 &self,
413 scope: &Scope,
414 config: ScopeConfig,
415 ) -> Result<(), ConsensusError> {
416 config.validate()?;
417 let mut configs = self.scope_configs.write().await;
418 configs.insert(scope.clone(), config);
419 Ok(())
420 }
421
422 async fn delete_scope(&self, scope: &Scope) -> Result<(), ConsensusError> {
423 let mut sessions = self.sessions.write().await;
424 sessions.remove(scope);
425 drop(sessions);
426
427 let mut configs = self.scope_configs.write().await;
428 configs.remove(scope);
429 Ok(())
430 }
431
432 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
433 where
434 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
435 {
436 let mut configs = self.scope_configs.write().await;
437 let config = configs.entry(scope.clone()).or_default();
438 updater(config)?;
439 config.validate()?;
440 Ok(())
441 }
442}