1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5 error::ConsensusError,
6 events::{BroadcastEventBus, ConsensusEventBus},
7 protos::consensus::v1::Proposal,
8 scope::{ConsensusScope, ScopeID},
9 scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10 session::{ConsensusConfig, ConsensusSession, ConsensusState},
11 storage::{ConsensusStorage, InMemoryConsensusStorage},
12 types::{ConsensusEvent, SessionTransition},
13 utils::{calculate_consensus_result, has_sufficient_votes},
14};
15pub struct ConsensusService<Scope, S, E>
20where
21 Scope: ConsensusScope,
22 S: ConsensusStorage<Scope>,
23 E: ConsensusEventBus<Scope>,
24{
25 storage: S,
26 max_sessions_per_scope: usize,
27 event_bus: E,
28 _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33 Scope: ConsensusScope,
34 S: ConsensusStorage<Scope>,
35 E: ConsensusEventBus<Scope>,
36{
37 fn clone(&self) -> Self {
38 Self {
39 storage: self.storage.clone(),
40 max_sessions_per_scope: self.max_sessions_per_scope,
41 event_bus: self.event_bus.clone(),
42 _scope: PhantomData,
43 }
44 }
45}
46
47pub type DefaultConsensusService =
53 ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56 fn new() -> Self {
58 Self::new_with_max_sessions(10)
59 }
60
61 pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65 Self::new_with_components(
66 InMemoryConsensusStorage::new(),
67 BroadcastEventBus::default(),
68 max_sessions_per_scope,
69 )
70 }
71}
72
73impl Default for DefaultConsensusService {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81 Scope: ConsensusScope,
82 S: ConsensusStorage<Scope>,
83 E: ConsensusEventBus<Scope>,
84{
85 pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91 Self {
92 storage,
93 max_sessions_per_scope,
94 event_bus,
95 _scope: PhantomData,
96 }
97 }
98
99 pub fn subscribe_to_events(&self) -> E::Receiver {
105 self.event_bus.subscribe()
106 }
107
108 pub async fn get_consensus_result(
113 &self,
114 scope: &Scope,
115 proposal_id: u32,
116 ) -> Result<bool, ConsensusError> {
117 let session = self
118 .storage
119 .get_session(scope, proposal_id)
120 .await?
121 .ok_or(ConsensusError::SessionNotFound)?;
122
123 match session.state {
124 ConsensusState::ConsensusReached(result) => Ok(result),
125 ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126 ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127 }
128 }
129
130 pub async fn get_active_proposals(
132 &self,
133 scope: &Scope,
134 ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135 let sessions = self
136 .storage
137 .list_scope_sessions(scope)
138 .await?
139 .ok_or(ConsensusError::ScopeNotFound)?;
140 let result = sessions
141 .into_iter()
142 .filter_map(|session| session.is_active().then_some(session.proposal))
143 .collect::<Vec<Proposal>>();
144 if result.is_empty() {
145 return Ok(None);
146 }
147 Ok(Some(result))
148 }
149
150 pub async fn get_reached_proposals(
156 &self,
157 scope: &Scope,
158 ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
159 let sessions = self
160 .storage
161 .list_scope_sessions(scope)
162 .await?
163 .ok_or(ConsensusError::ScopeNotFound)?;
164
165 let result = sessions
166 .into_iter()
167 .filter_map(|session| {
168 session
169 .get_consensus_result()
170 .ok()
171 .map(|result| (session.proposal.proposal_id, result))
172 })
173 .collect::<HashMap<u32, bool>>();
174 if result.is_empty() {
175 return Ok(None);
176 }
177 Ok(Some(result))
178 }
179
180 pub async fn has_sufficient_votes_for_proposal(
182 &self,
183 scope: &Scope,
184 proposal_id: u32,
185 ) -> Result<bool, ConsensusError> {
186 let session = self.get_session(scope, proposal_id).await?;
187 let total_votes = session.votes.len() as u32;
188 let expected_voters = session.proposal.expected_voters_count;
189 Ok(has_sufficient_votes(
190 total_votes,
191 expected_voters,
192 session.config.consensus_threshold(),
193 ))
194 }
195
196 pub async fn scope(
230 &self,
231 scope: &Scope,
232 ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
233 let existing_config = self.storage.get_scope_config(scope).await?;
234 let builder = if let Some(config) = existing_config {
235 ScopeConfigBuilder::from_existing(config)
236 } else {
237 ScopeConfigBuilder::new()
238 };
239 Ok(ScopeConfigBuilderWrapper::new(
240 self.clone(),
241 scope.clone(),
242 builder,
243 ))
244 }
245
246 async fn initialize_scope(
247 &self,
248 scope: &Scope,
249 config: ScopeConfig,
250 ) -> Result<(), ConsensusError> {
251 config.validate()?;
252 self.storage.set_scope_config(scope, config).await
253 }
254
255 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
256 where
257 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
258 {
259 self.storage.update_scope_config(scope, updater).await
260 }
261
262 pub(crate) async fn resolve_config(
267 &self,
268 scope: &Scope,
269 proposal_override: Option<ConsensusConfig>,
270 proposal: Option<&Proposal>,
271 ) -> Result<ConsensusConfig, ConsensusError> {
272 let base_config = if let Some(override_config) = proposal_override {
274 override_config
275 } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
276 ConsensusConfig::from(scope_config)
277 } else {
278 ConsensusConfig::gossipsub()
279 };
280
281 if let Some(prop) = proposal {
283 let timeout_seconds = if prop.expiration_timestamp > prop.timestamp {
285 Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
286 } else {
287 base_config.consensus_timeout()
288 };
289
290 Ok(ConsensusConfig::new(
291 base_config.consensus_threshold(),
292 timeout_seconds,
293 base_config.max_rounds(),
294 base_config.use_gossipsub_rounds(),
295 prop.liveness_criteria_yes,
296 ))
297 } else {
298 Ok(base_config)
299 }
300 }
301
302 pub async fn handle_consensus_timeout(
309 &self,
310 scope: &Scope,
311 proposal_id: u32,
312 ) -> Result<bool, ConsensusError> {
313 let timeout_result: Result<Option<bool>, ConsensusError> = self
314 .update_session(scope, proposal_id, |session| {
315 if let ConsensusState::ConsensusReached(result) = session.state {
316 return Ok(Some(result));
317 }
318
319 let result = calculate_consensus_result(
322 &session.votes,
323 session.proposal.expected_voters_count,
324 session.config.consensus_threshold(),
325 session.proposal.liveness_criteria_yes,
326 );
327
328 if let Some(result) = result {
329 session.state = ConsensusState::ConsensusReached(result);
330 Ok(Some(result))
331 } else {
332 session.state = ConsensusState::Failed;
333 Ok(None)
334 }
335 })
336 .await;
337
338 match timeout_result? {
339 Some(consensus_result) => {
340 self.emit_event(
341 scope,
342 ConsensusEvent::ConsensusReached {
343 proposal_id,
344 result: consensus_result,
345 },
346 );
347 Ok(consensus_result)
348 }
349 None => {
350 self.emit_event(scope, ConsensusEvent::ConsensusFailed { proposal_id });
351 Err(ConsensusError::InsufficientVotesAtTimeout)
352 }
353 }
354 }
355
356 pub(crate) async fn get_session(
357 &self,
358 scope: &Scope,
359 proposal_id: u32,
360 ) -> Result<ConsensusSession, ConsensusError> {
361 self.storage
362 .get_session(scope, proposal_id)
363 .await?
364 .ok_or(ConsensusError::SessionNotFound)
365 }
366
367 pub(crate) async fn update_session<R, F>(
368 &self,
369 scope: &Scope,
370 proposal_id: u32,
371 mutator: F,
372 ) -> Result<R, ConsensusError>
373 where
374 R: Send,
375 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
376 {
377 self.storage
378 .update_session(scope, proposal_id, mutator)
379 .await
380 }
381
382 pub(crate) async fn save_session(
383 &self,
384 scope: &Scope,
385 session: ConsensusSession,
386 ) -> Result<(), ConsensusError> {
387 self.storage.save_session(scope, session).await
388 }
389
390 pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
391 self.storage
392 .update_scope_sessions(scope, |sessions| {
393 if sessions.len() <= self.max_sessions_per_scope {
394 return Ok(());
395 }
396
397 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
398 sessions.truncate(self.max_sessions_per_scope);
399 Ok(())
400 })
401 .await
402 }
403
404 pub(crate) async fn list_scope_sessions(
405 &self,
406 scope: &Scope,
407 ) -> Result<Vec<ConsensusSession>, ConsensusError> {
408 self.storage
409 .list_scope_sessions(scope)
410 .await?
411 .ok_or(ConsensusError::ScopeNotFound)
412 }
413
414 pub(crate) fn handle_transition(
415 &self,
416 scope: &Scope,
417 proposal_id: u32,
418 transition: SessionTransition,
419 ) {
420 if let SessionTransition::ConsensusReached(result) = transition {
421 self.emit_event(
422 scope,
423 ConsensusEvent::ConsensusReached {
424 proposal_id,
425 result,
426 },
427 );
428 }
429 }
430
431 fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
432 self.event_bus.publish(scope.clone(), event);
433 }
434}
435
436pub struct ScopeConfigBuilderWrapper<Scope, S, E>
438where
439 Scope: ConsensusScope,
440 S: ConsensusStorage<Scope>,
441 E: ConsensusEventBus<Scope>,
442{
443 service: ConsensusService<Scope, S, E>,
444 scope: Scope,
445 builder: ScopeConfigBuilder,
446}
447
448impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
449where
450 Scope: ConsensusScope,
451 S: ConsensusStorage<Scope>,
452 E: ConsensusEventBus<Scope>,
453{
454 fn new(
455 service: ConsensusService<Scope, S, E>,
456 scope: Scope,
457 builder: ScopeConfigBuilder,
458 ) -> Self {
459 Self {
460 service,
461 scope,
462 builder,
463 }
464 }
465
466 pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
468 self.builder = self.builder.with_network_type(network_type);
469 self
470 }
471
472 pub fn with_threshold(mut self, threshold: f64) -> Self {
474 self.builder = self.builder.with_threshold(threshold);
475 self
476 }
477
478 pub fn with_timeout(mut self, timeout: Duration) -> Self {
480 self.builder = self.builder.with_timeout(timeout);
481 self
482 }
483
484 pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
486 self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
487 self
488 }
489
490 pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
492 self.builder = self.builder.with_max_rounds(max_rounds);
493 self
494 }
495
496 pub fn p2p_preset(mut self) -> Self {
498 self.builder = self.builder.p2p_preset();
499 self
500 }
501
502 pub fn gossipsub_preset(mut self) -> Self {
504 self.builder = self.builder.gossipsub_preset();
505 self
506 }
507
508 pub fn strict_consensus(mut self) -> Self {
510 self.builder = self.builder.strict_consensus();
511 self
512 }
513
514 pub fn fast_consensus(mut self) -> Self {
516 self.builder = self.builder.fast_consensus();
517 self
518 }
519
520 pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
522 self.builder = self.builder.with_network_defaults(network_type);
523 self
524 }
525
526 pub async fn initialize(self) -> Result<(), ConsensusError> {
528 let config = self.builder.build()?;
529 self.service.initialize_scope(&self.scope, config).await
530 }
531
532 pub async fn update(self) -> Result<(), ConsensusError> {
534 let config = self.builder.build()?;
535 self.service
536 .update_scope_config(&self.scope, |existing| {
537 *existing = config;
538 Ok(())
539 })
540 .await
541 }
542
543 pub fn get_config(&self) -> ScopeConfig {
545 self.builder.get_config()
546 }
547}