1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5 error::ConsensusError,
6 events::{BroadcastEventBus, ConsensusEventBus},
7 protos::consensus::v1::Proposal,
8 scope::{ConsensusScope, ScopeID},
9 scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10 session::{ConsensusConfig, ConsensusSession, ConsensusState},
11 storage::{ConsensusStorage, InMemoryConsensusStorage},
12 types::{ConsensusEvent, SessionTransition},
13 utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
14};
15pub struct ConsensusService<Scope, S, E>
20where
21 Scope: ConsensusScope,
22 S: ConsensusStorage<Scope>,
23 E: ConsensusEventBus<Scope>,
24{
25 storage: S,
26 max_sessions_per_scope: usize,
27 event_bus: E,
28 _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33 Scope: ConsensusScope,
34 S: ConsensusStorage<Scope>,
35 E: ConsensusEventBus<Scope>,
36{
37 fn clone(&self) -> Self {
38 Self {
39 storage: self.storage.clone(),
40 max_sessions_per_scope: self.max_sessions_per_scope,
41 event_bus: self.event_bus.clone(),
42 _scope: PhantomData,
43 }
44 }
45}
46
47pub type DefaultConsensusService =
53 ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56 fn new() -> Self {
58 Self::new_with_max_sessions(10)
59 }
60
61 pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65 Self::new_with_components(
66 InMemoryConsensusStorage::new(),
67 BroadcastEventBus::default(),
68 max_sessions_per_scope,
69 )
70 }
71}
72
73impl Default for DefaultConsensusService {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81 Scope: ConsensusScope,
82 S: ConsensusStorage<Scope>,
83 E: ConsensusEventBus<Scope>,
84{
85 pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91 Self {
92 storage,
93 max_sessions_per_scope,
94 event_bus,
95 _scope: PhantomData,
96 }
97 }
98
99 pub fn subscribe_to_events(&self) -> E::Receiver {
105 self.event_bus.subscribe()
106 }
107
108 pub async fn get_consensus_result(
113 &self,
114 scope: &Scope,
115 proposal_id: u32,
116 ) -> Result<bool, ConsensusError> {
117 let session = self
118 .storage
119 .get_session(scope, proposal_id)
120 .await?
121 .ok_or(ConsensusError::SessionNotFound)?;
122
123 match session.state {
124 ConsensusState::ConsensusReached(result) => Ok(result),
125 ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126 ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127 }
128 }
129
130 pub async fn get_active_proposals(
132 &self,
133 scope: &Scope,
134 ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135 let sessions = self
136 .storage
137 .list_scope_sessions(scope)
138 .await?
139 .ok_or(ConsensusError::ScopeNotFound)?;
140 let result = sessions
141 .into_iter()
142 .filter_map(|session| session.is_active().then_some(session.proposal))
143 .collect::<Vec<Proposal>>();
144 if result.is_empty() {
145 return Ok(None);
146 }
147 Ok(Some(result))
148 }
149
150 pub async fn get_proposal_config(
152 &self,
153 scope: &Scope,
154 proposal_id: u32,
155 ) -> Result<ConsensusConfig, ConsensusError> {
156 let session = self.get_session(scope, proposal_id).await?;
157 Ok(session.config)
158 }
159
160 pub async fn get_reached_proposals(
166 &self,
167 scope: &Scope,
168 ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
169 let sessions = self
170 .storage
171 .list_scope_sessions(scope)
172 .await?
173 .ok_or(ConsensusError::ScopeNotFound)?;
174
175 let result = sessions
176 .into_iter()
177 .filter_map(|session| {
178 session
179 .get_consensus_result()
180 .ok()
181 .map(|result| (session.proposal.proposal_id, result))
182 })
183 .collect::<HashMap<u32, bool>>();
184 if result.is_empty() {
185 return Ok(None);
186 }
187 Ok(Some(result))
188 }
189
190 pub async fn has_sufficient_votes_for_proposal(
192 &self,
193 scope: &Scope,
194 proposal_id: u32,
195 ) -> Result<bool, ConsensusError> {
196 let session = self.get_session(scope, proposal_id).await?;
197 let total_votes = session.votes.len() as u32;
198 let expected_voters = session.proposal.expected_voters_count;
199 Ok(has_sufficient_votes(
200 total_votes,
201 expected_voters,
202 session.config.consensus_threshold(),
203 ))
204 }
205
206 pub async fn scope(
240 &self,
241 scope: &Scope,
242 ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
243 let existing_config = self.storage.get_scope_config(scope).await?;
244 let builder = if let Some(config) = existing_config {
245 ScopeConfigBuilder::from_existing(config)
246 } else {
247 ScopeConfigBuilder::new()
248 };
249 Ok(ScopeConfigBuilderWrapper::new(
250 self.clone(),
251 scope.clone(),
252 builder,
253 ))
254 }
255
256 async fn initialize_scope(
257 &self,
258 scope: &Scope,
259 config: ScopeConfig,
260 ) -> Result<(), ConsensusError> {
261 config.validate()?;
262 self.storage.set_scope_config(scope, config).await
263 }
264
265 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
266 where
267 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
268 {
269 self.storage.update_scope_config(scope, updater).await
270 }
271
272 pub(crate) async fn resolve_config(
277 &self,
278 scope: &Scope,
279 proposal_override: Option<ConsensusConfig>,
280 proposal: Option<&Proposal>,
281 ) -> Result<ConsensusConfig, ConsensusError> {
282 let has_explicit_override = proposal_override.is_some();
286 let base_config = if let Some(override_config) = proposal_override {
287 override_config
288 } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
289 ConsensusConfig::from(scope_config)
290 } else {
291 ConsensusConfig::gossipsub()
292 };
293
294 if let Some(prop) = proposal {
296 let timeout_seconds = if has_explicit_override {
299 base_config.consensus_timeout()
300 } else if prop.expiration_timestamp > prop.timestamp {
301 Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
302 } else {
303 base_config.consensus_timeout()
304 };
305
306 Ok(ConsensusConfig::new(
307 base_config.consensus_threshold(),
308 timeout_seconds,
309 base_config.max_rounds(),
310 base_config.use_gossipsub_rounds(),
311 prop.liveness_criteria_yes,
312 ))
313 } else {
314 Ok(base_config)
315 }
316 }
317
318 pub async fn handle_consensus_timeout(
325 &self,
326 scope: &Scope,
327 proposal_id: u32,
328 ) -> Result<bool, ConsensusError> {
329 let timeout_result: Result<Option<bool>, ConsensusError> = self
330 .update_session(scope, proposal_id, |session| {
331 if let ConsensusState::ConsensusReached(result) = session.state {
332 return Ok(Some(result));
333 }
334
335 let result = calculate_consensus_result(
338 &session.votes,
339 session.proposal.expected_voters_count,
340 session.config.consensus_threshold(),
341 session.proposal.liveness_criteria_yes,
342 );
343
344 if let Some(result) = result {
345 session.state = ConsensusState::ConsensusReached(result);
346 Ok(Some(result))
347 } else {
348 session.state = ConsensusState::Failed;
349 Ok(None)
350 }
351 })
352 .await;
353
354 match timeout_result? {
355 Some(consensus_result) => {
356 self.emit_event(
357 scope,
358 ConsensusEvent::ConsensusReached {
359 proposal_id,
360 result: consensus_result,
361 timestamp: current_timestamp()?,
362 },
363 );
364 Ok(consensus_result)
365 }
366 None => {
367 self.emit_event(
368 scope,
369 ConsensusEvent::ConsensusFailed {
370 proposal_id,
371 timestamp: current_timestamp()?,
372 },
373 );
374 Err(ConsensusError::InsufficientVotesAtTimeout)
375 }
376 }
377 }
378
379 pub(crate) async fn get_session(
380 &self,
381 scope: &Scope,
382 proposal_id: u32,
383 ) -> Result<ConsensusSession, ConsensusError> {
384 self.storage
385 .get_session(scope, proposal_id)
386 .await?
387 .ok_or(ConsensusError::SessionNotFound)
388 }
389
390 pub(crate) async fn update_session<R, F>(
391 &self,
392 scope: &Scope,
393 proposal_id: u32,
394 mutator: F,
395 ) -> Result<R, ConsensusError>
396 where
397 R: Send,
398 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
399 {
400 self.storage
401 .update_session(scope, proposal_id, mutator)
402 .await
403 }
404
405 pub(crate) async fn save_session(
406 &self,
407 scope: &Scope,
408 session: ConsensusSession,
409 ) -> Result<(), ConsensusError> {
410 self.storage.save_session(scope, session).await
411 }
412
413 pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
414 self.storage
415 .update_scope_sessions(scope, |sessions| {
416 if sessions.len() <= self.max_sessions_per_scope {
417 return Ok(());
418 }
419
420 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
421 sessions.truncate(self.max_sessions_per_scope);
422 Ok(())
423 })
424 .await
425 }
426
427 pub(crate) async fn list_scope_sessions(
428 &self,
429 scope: &Scope,
430 ) -> Result<Vec<ConsensusSession>, ConsensusError> {
431 self.storage
432 .list_scope_sessions(scope)
433 .await?
434 .ok_or(ConsensusError::ScopeNotFound)
435 }
436
437 pub(crate) fn handle_transition(
438 &self,
439 scope: &Scope,
440 proposal_id: u32,
441 transition: SessionTransition,
442 ) {
443 if let SessionTransition::ConsensusReached(result) = transition {
444 self.emit_event(
445 scope,
446 ConsensusEvent::ConsensusReached {
447 proposal_id,
448 result,
449 timestamp: current_timestamp().unwrap_or(0),
450 },
451 );
452 }
453 }
454
455 fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
456 self.event_bus.publish(scope.clone(), event);
457 }
458}
459
460pub struct ScopeConfigBuilderWrapper<Scope, S, E>
462where
463 Scope: ConsensusScope,
464 S: ConsensusStorage<Scope>,
465 E: ConsensusEventBus<Scope>,
466{
467 service: ConsensusService<Scope, S, E>,
468 scope: Scope,
469 builder: ScopeConfigBuilder,
470}
471
472impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
473where
474 Scope: ConsensusScope,
475 S: ConsensusStorage<Scope>,
476 E: ConsensusEventBus<Scope>,
477{
478 fn new(
479 service: ConsensusService<Scope, S, E>,
480 scope: Scope,
481 builder: ScopeConfigBuilder,
482 ) -> Self {
483 Self {
484 service,
485 scope,
486 builder,
487 }
488 }
489
490 pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
492 self.builder = self.builder.with_network_type(network_type);
493 self
494 }
495
496 pub fn with_threshold(mut self, threshold: f64) -> Self {
498 self.builder = self.builder.with_threshold(threshold);
499 self
500 }
501
502 pub fn with_timeout(mut self, timeout: Duration) -> Self {
504 self.builder = self.builder.with_timeout(timeout);
505 self
506 }
507
508 pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
510 self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
511 self
512 }
513
514 pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
516 self.builder = self.builder.with_max_rounds(max_rounds);
517 self
518 }
519
520 pub fn p2p_preset(mut self) -> Self {
522 self.builder = self.builder.p2p_preset();
523 self
524 }
525
526 pub fn gossipsub_preset(mut self) -> Self {
528 self.builder = self.builder.gossipsub_preset();
529 self
530 }
531
532 pub fn strict_consensus(mut self) -> Self {
534 self.builder = self.builder.strict_consensus();
535 self
536 }
537
538 pub fn fast_consensus(mut self) -> Self {
540 self.builder = self.builder.fast_consensus();
541 self
542 }
543
544 pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
546 self.builder = self.builder.with_network_defaults(network_type);
547 self
548 }
549
550 pub async fn initialize(self) -> Result<(), ConsensusError> {
552 let config = self.builder.build()?;
553 self.service.initialize_scope(&self.scope, config).await
554 }
555
556 pub async fn update(self) -> Result<(), ConsensusError> {
558 let config = self.builder.build()?;
559 self.service
560 .update_scope_config(&self.scope, |existing| {
561 *existing = config;
562 Ok(())
563 })
564 .await
565 }
566
567 pub fn get_config(&self) -> ScopeConfig {
569 self.builder.get_config()
570 }
571}