1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5 error::ConsensusError,
6 events::{BroadcastEventBus, ConsensusEventBus},
7 protos::consensus::v1::Proposal,
8 scope::{ConsensusScope, ScopeID},
9 scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10 session::{ConsensusConfig, ConsensusSession, ConsensusState},
11 storage::{ConsensusStorage, InMemoryConsensusStorage},
12 types::{ConsensusEvent, SessionTransition},
13 utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
14};
15pub struct ConsensusService<Scope, S, E>
20where
21 Scope: ConsensusScope,
22 S: ConsensusStorage<Scope>,
23 E: ConsensusEventBus<Scope>,
24{
25 storage: S,
26 max_sessions_per_scope: usize,
27 event_bus: E,
28 _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33 Scope: ConsensusScope,
34 S: ConsensusStorage<Scope>,
35 E: ConsensusEventBus<Scope>,
36{
37 fn clone(&self) -> Self {
38 Self {
39 storage: self.storage.clone(),
40 max_sessions_per_scope: self.max_sessions_per_scope,
41 event_bus: self.event_bus.clone(),
42 _scope: PhantomData,
43 }
44 }
45}
46
47pub type DefaultConsensusService =
53 ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56 fn new() -> Self {
58 Self::new_with_max_sessions(10)
59 }
60
61 pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65 Self::new_with_components(
66 InMemoryConsensusStorage::new(),
67 BroadcastEventBus::default(),
68 max_sessions_per_scope,
69 )
70 }
71}
72
73impl Default for DefaultConsensusService {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81 Scope: ConsensusScope,
82 S: ConsensusStorage<Scope>,
83 E: ConsensusEventBus<Scope>,
84{
85 pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91 Self {
92 storage,
93 max_sessions_per_scope,
94 event_bus,
95 _scope: PhantomData,
96 }
97 }
98
99 pub fn subscribe_to_events(&self) -> E::Receiver {
105 self.event_bus.subscribe()
106 }
107
108 pub async fn get_consensus_result(
113 &self,
114 scope: &Scope,
115 proposal_id: u32,
116 ) -> Result<bool, ConsensusError> {
117 let session = self
118 .storage
119 .get_session(scope, proposal_id)
120 .await?
121 .ok_or(ConsensusError::SessionNotFound)?;
122
123 match session.state {
124 ConsensusState::ConsensusReached(result) => Ok(result),
125 ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126 ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127 }
128 }
129
130 pub async fn get_active_proposals(
132 &self,
133 scope: &Scope,
134 ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135 let sessions = self
136 .storage
137 .list_scope_sessions(scope)
138 .await?
139 .ok_or(ConsensusError::ScopeNotFound)?;
140 let result = sessions
141 .into_iter()
142 .filter_map(|session| session.is_active().then_some(session.proposal))
143 .collect::<Vec<Proposal>>();
144 if result.is_empty() {
145 return Ok(None);
146 }
147 Ok(Some(result))
148 }
149
150 pub async fn get_proposal_config(
152 &self,
153 scope: &Scope,
154 proposal_id: u32,
155 ) -> Result<ConsensusConfig, ConsensusError> {
156 let session = self.get_session(scope, proposal_id).await?;
157 Ok(session.config)
158 }
159
160 pub async fn get_reached_proposals(
166 &self,
167 scope: &Scope,
168 ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
169 let sessions = self
170 .storage
171 .list_scope_sessions(scope)
172 .await?
173 .ok_or(ConsensusError::ScopeNotFound)?;
174
175 let result = sessions
176 .into_iter()
177 .filter_map(|session| {
178 session
179 .get_consensus_result()
180 .ok()
181 .map(|result| (session.proposal.proposal_id, result))
182 })
183 .collect::<HashMap<u32, bool>>();
184 if result.is_empty() {
185 return Ok(None);
186 }
187 Ok(Some(result))
188 }
189
190 pub async fn has_sufficient_votes_for_proposal(
192 &self,
193 scope: &Scope,
194 proposal_id: u32,
195 ) -> Result<bool, ConsensusError> {
196 let session = self.get_session(scope, proposal_id).await?;
197 let total_votes = session.votes.len() as u32;
198 let expected_voters = session.proposal.expected_voters_count;
199 Ok(has_sufficient_votes(
200 total_votes,
201 expected_voters,
202 session.config.consensus_threshold(),
203 ))
204 }
205
206 pub async fn scope(
240 &self,
241 scope: &Scope,
242 ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
243 let existing_config = self.storage.get_scope_config(scope).await?;
244 let builder = if let Some(config) = existing_config {
245 ScopeConfigBuilder::from_existing(config)
246 } else {
247 ScopeConfigBuilder::new()
248 };
249 Ok(ScopeConfigBuilderWrapper::new(
250 self.clone(),
251 scope.clone(),
252 builder,
253 ))
254 }
255
256 async fn initialize_scope(
257 &self,
258 scope: &Scope,
259 config: ScopeConfig,
260 ) -> Result<(), ConsensusError> {
261 config.validate()?;
262 self.storage.set_scope_config(scope, config).await
263 }
264
265 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
266 where
267 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
268 {
269 self.storage.update_scope_config(scope, updater).await
270 }
271
272 pub(crate) async fn resolve_config(
277 &self,
278 scope: &Scope,
279 proposal_override: Option<ConsensusConfig>,
280 proposal: Option<&Proposal>,
281 ) -> Result<ConsensusConfig, ConsensusError> {
282 let has_explicit_override = proposal_override.is_some();
286 let base_config = if let Some(override_config) = proposal_override {
287 override_config
288 } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
289 ConsensusConfig::from(scope_config)
290 } else {
291 ConsensusConfig::gossipsub()
292 };
293
294 if let Some(prop) = proposal {
296 let timeout_seconds = if has_explicit_override {
299 base_config.consensus_timeout()
300 } else if prop.expiration_timestamp > prop.timestamp {
301 Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
302 } else {
303 base_config.consensus_timeout()
304 };
305
306 Ok(ConsensusConfig::new(
307 base_config.consensus_threshold(),
308 timeout_seconds,
309 base_config.max_rounds(),
310 base_config.use_gossipsub_rounds(),
311 prop.liveness_criteria_yes,
312 ))
313 } else {
314 Ok(base_config)
315 }
316 }
317
318 pub async fn handle_consensus_timeout(
325 &self,
326 scope: &Scope,
327 proposal_id: u32,
328 ) -> Result<bool, ConsensusError> {
329 let timeout_result: Result<Option<bool>, ConsensusError> = self
330 .update_session(scope, proposal_id, |session| {
331 if let ConsensusState::ConsensusReached(result) = session.state {
332 return Ok(Some(result));
333 }
334
335 let result = calculate_consensus_result(
338 &session.votes,
339 session.proposal.expected_voters_count,
340 session.config.consensus_threshold(),
341 session.proposal.liveness_criteria_yes,
342 true,
343 );
344
345 if let Some(result) = result {
346 session.state = ConsensusState::ConsensusReached(result);
347 Ok(Some(result))
348 } else {
349 session.state = ConsensusState::Failed;
350 Ok(None)
351 }
352 })
353 .await;
354
355 match timeout_result? {
356 Some(consensus_result) => {
357 self.emit_event(
358 scope,
359 ConsensusEvent::ConsensusReached {
360 proposal_id,
361 result: consensus_result,
362 timestamp: current_timestamp()?,
363 },
364 );
365 Ok(consensus_result)
366 }
367 None => {
368 self.emit_event(
369 scope,
370 ConsensusEvent::ConsensusFailed {
371 proposal_id,
372 timestamp: current_timestamp()?,
373 },
374 );
375 Err(ConsensusError::InsufficientVotesAtTimeout)
376 }
377 }
378 }
379
380 pub(crate) async fn get_session(
381 &self,
382 scope: &Scope,
383 proposal_id: u32,
384 ) -> Result<ConsensusSession, ConsensusError> {
385 self.storage
386 .get_session(scope, proposal_id)
387 .await?
388 .ok_or(ConsensusError::SessionNotFound)
389 }
390
391 pub(crate) async fn update_session<R, F>(
392 &self,
393 scope: &Scope,
394 proposal_id: u32,
395 mutator: F,
396 ) -> Result<R, ConsensusError>
397 where
398 R: Send,
399 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
400 {
401 self.storage
402 .update_session(scope, proposal_id, mutator)
403 .await
404 }
405
406 pub(crate) async fn save_session(
407 &self,
408 scope: &Scope,
409 session: ConsensusSession,
410 ) -> Result<(), ConsensusError> {
411 self.storage.save_session(scope, session).await
412 }
413
414 pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
415 self.storage
416 .update_scope_sessions(scope, |sessions| {
417 if sessions.len() <= self.max_sessions_per_scope {
418 return Ok(());
419 }
420
421 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
422 sessions.truncate(self.max_sessions_per_scope);
423 Ok(())
424 })
425 .await
426 }
427
428 pub(crate) async fn list_scope_sessions(
429 &self,
430 scope: &Scope,
431 ) -> Result<Vec<ConsensusSession>, ConsensusError> {
432 self.storage
433 .list_scope_sessions(scope)
434 .await?
435 .ok_or(ConsensusError::ScopeNotFound)
436 }
437
438 pub(crate) fn handle_transition(
439 &self,
440 scope: &Scope,
441 proposal_id: u32,
442 transition: SessionTransition,
443 ) {
444 if let SessionTransition::ConsensusReached(result) = transition {
445 self.emit_event(
446 scope,
447 ConsensusEvent::ConsensusReached {
448 proposal_id,
449 result,
450 timestamp: current_timestamp().unwrap_or(0),
451 },
452 );
453 }
454 }
455
456 fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
457 self.event_bus.publish(scope.clone(), event);
458 }
459}
460
461pub struct ScopeConfigBuilderWrapper<Scope, S, E>
463where
464 Scope: ConsensusScope,
465 S: ConsensusStorage<Scope>,
466 E: ConsensusEventBus<Scope>,
467{
468 service: ConsensusService<Scope, S, E>,
469 scope: Scope,
470 builder: ScopeConfigBuilder,
471}
472
473impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
474where
475 Scope: ConsensusScope,
476 S: ConsensusStorage<Scope>,
477 E: ConsensusEventBus<Scope>,
478{
479 fn new(
480 service: ConsensusService<Scope, S, E>,
481 scope: Scope,
482 builder: ScopeConfigBuilder,
483 ) -> Self {
484 Self {
485 service,
486 scope,
487 builder,
488 }
489 }
490
491 pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
493 self.builder = self.builder.with_network_type(network_type);
494 self
495 }
496
497 pub fn with_threshold(mut self, threshold: f64) -> Self {
499 self.builder = self.builder.with_threshold(threshold);
500 self
501 }
502
503 pub fn with_timeout(mut self, timeout: Duration) -> Self {
505 self.builder = self.builder.with_timeout(timeout);
506 self
507 }
508
509 pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
511 self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
512 self
513 }
514
515 pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
517 self.builder = self.builder.with_max_rounds(max_rounds);
518 self
519 }
520
521 pub fn p2p_preset(mut self) -> Self {
523 self.builder = self.builder.p2p_preset();
524 self
525 }
526
527 pub fn gossipsub_preset(mut self) -> Self {
529 self.builder = self.builder.gossipsub_preset();
530 self
531 }
532
533 pub fn strict_consensus(mut self) -> Self {
535 self.builder = self.builder.strict_consensus();
536 self
537 }
538
539 pub fn fast_consensus(mut self) -> Self {
541 self.builder = self.builder.fast_consensus();
542 self
543 }
544
545 pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
547 self.builder = self.builder.with_network_defaults(network_type);
548 self
549 }
550
551 pub async fn initialize(self) -> Result<(), ConsensusError> {
553 let config = self.builder.build()?;
554 self.service.initialize_scope(&self.scope, config).await
555 }
556
557 pub async fn update(self) -> Result<(), ConsensusError> {
559 let config = self.builder.build()?;
560 self.service
561 .update_scope_config(&self.scope, |existing| {
562 *existing = config;
563 Ok(())
564 })
565 .await
566 }
567
568 pub fn get_config(&self) -> ScopeConfig {
570 self.builder.get_config()
571 }
572}