1use std::marker::PhantomData;
2
3use tokio::time::Duration;
4
5use crate::{
6 error::ConsensusError,
7 events::{BroadcastEventBus, ConsensusEventBus},
8 protos::consensus::v1::{Proposal, Vote},
9 scope::{ConsensusScope, ScopeID},
10 scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
11 session::{ConsensusConfig, ConsensusSession, ConsensusState},
12 signing::{ConsensusSignatureScheme, EthereumConsensusSigner},
13 storage::{ConsensusStorage, InMemoryConsensusStorage},
14 types::{ConsensusEvent, CreateProposalRequest, SessionTransition},
15 utils::{
16 build_vote, calculate_consensus_result, current_timestamp, validate_proposal_timestamp,
17 validate_vote,
18 },
19};
20pub struct ConsensusService<Scope, Storage, Event, Signer>
39where
40 Scope: ConsensusScope,
41 Storage: ConsensusStorage<Scope>,
42 Event: ConsensusEventBus<Scope>,
43 Signer: ConsensusSignatureScheme,
44{
45 storage: Storage,
46 max_sessions_per_scope: usize,
47 event_bus: Event,
48 signer: Signer,
49 _scope: PhantomData<Scope>,
50}
51
52impl<Scope, Storage, Event, Signer> Clone for ConsensusService<Scope, Storage, Event, Signer>
53where
54 Scope: ConsensusScope,
55 Storage: ConsensusStorage<Scope>,
56 Event: ConsensusEventBus<Scope>,
57 Signer: ConsensusSignatureScheme,
58{
59 fn clone(&self) -> Self {
60 Self {
61 storage: self.storage.clone(),
62 max_sessions_per_scope: self.max_sessions_per_scope,
63 event_bus: self.event_bus.clone(),
64 signer: self.signer.clone(),
65 _scope: PhantomData,
66 }
67 }
68}
69
70pub type DefaultConsensusService = ConsensusService<
75 ScopeID,
76 InMemoryConsensusStorage<ScopeID>,
77 BroadcastEventBus<ScopeID>,
78 EthereumConsensusSigner,
79>;
80
81impl DefaultConsensusService {
82 pub fn new(signer: EthereumConsensusSigner) -> Self {
85 Self::new_with_max_sessions(signer, 10)
86 }
87
88 pub fn new_with_max_sessions(
94 signer: EthereumConsensusSigner,
95 max_sessions_per_scope: usize,
96 ) -> Self {
97 Self::new_with_components(
98 InMemoryConsensusStorage::new(),
99 BroadcastEventBus::default(),
100 signer,
101 max_sessions_per_scope,
102 )
103 }
104}
105
106impl<Scope, Storage, Event, Signer> ConsensusService<Scope, Storage, Event, Signer>
107where
108 Scope: ConsensusScope,
109 Storage: ConsensusStorage<Scope>,
110 Event: ConsensusEventBus<Scope>,
111 Signer: ConsensusSignatureScheme,
112{
113 pub fn new_with_components(
122 storage: Storage,
123 event_bus: Event,
124 signer: Signer,
125 max_sessions_per_scope: usize,
126 ) -> Self {
127 Self {
128 storage,
129 max_sessions_per_scope,
130 event_bus,
131 signer,
132 _scope: PhantomData,
133 }
134 }
135
136 pub fn storage(&self) -> &Storage {
143 &self.storage
144 }
145
146 pub fn event_bus(&self) -> &Event {
150 &self.event_bus
151 }
152
153 pub fn signer(&self) -> &Signer {
158 &self.signer
159 }
160
161 pub async fn create_proposal(
178 &self,
179 scope: &Scope,
180 request: CreateProposalRequest,
181 ) -> Result<Proposal, ConsensusError> {
182 self.create_proposal_with_config(scope, request, None).await
183 }
184
185 pub async fn create_proposal_with_config(
189 &self,
190 scope: &Scope,
191 request: CreateProposalRequest,
192 config: Option<ConsensusConfig>,
193 ) -> Result<Proposal, ConsensusError> {
194 let proposal = request.into_proposal()?;
195 let config = self.resolve_config(scope, config, Some(&proposal)).await?;
196 let (session, _) =
197 ConsensusSession::from_proposal::<Signer>(proposal.clone(), config.clone())?;
198 self.save_session(scope, session).await?;
199 self.trim_scope_sessions(scope).await?;
200 Ok(proposal)
201 }
202
203 pub async fn cast_vote(
209 &self,
210 scope: &Scope,
211 proposal_id: u32,
212 choice: bool,
213 ) -> Result<Vote, ConsensusError> {
214 let session = self.get_session(scope, proposal_id).await?;
215 validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
216
217 if session.votes.contains_key(self.signer.identity()) {
218 return Err(ConsensusError::UserAlreadyVoted);
219 }
220
221 let vote = build_vote(&session.proposal, choice, &self.signer).await?;
222 let vote_clone = vote.clone();
223 let transition = self
224 .update_session(scope, proposal_id, move |session| {
225 session.add_vote(vote_clone)
226 })
227 .await?;
228 self.handle_transition(scope, proposal_id, transition);
229 Ok(vote)
230 }
231
232 pub async fn cast_vote_and_get_proposal(
237 &self,
238 scope: &Scope,
239 proposal_id: u32,
240 choice: bool,
241 ) -> Result<Proposal, ConsensusError> {
242 self.cast_vote(scope, proposal_id, choice).await?;
243 let session = self.get_session(scope, proposal_id).await?;
244 Ok(session.proposal)
245 }
246
247 pub async fn process_incoming_proposal(
256 &self,
257 scope: &Scope,
258 proposal: Proposal,
259 ) -> Result<(), ConsensusError> {
260 if self.get_session(scope, proposal.proposal_id).await.is_ok() {
261 return Err(ConsensusError::ProposalAlreadyExist);
262 }
263 let config = self.resolve_config(scope, None, Some(&proposal)).await?;
264 let (session, transition) = ConsensusSession::from_proposal::<Signer>(proposal, config)?;
265 self.handle_transition(scope, session.proposal.proposal_id, transition);
266 self.save_session(scope, session).await?;
267 self.trim_scope_sessions(scope).await?;
268 Ok(())
269 }
270
271 pub async fn process_incoming_vote(
277 &self,
278 scope: &Scope,
279 vote: Vote,
280 ) -> Result<(), ConsensusError> {
281 let session = self.get_session(scope, vote.proposal_id).await?;
282 validate_vote::<Signer>(
283 &vote,
284 session.proposal.expiration_timestamp,
285 session.proposal.timestamp,
286 )?;
287 let proposal_id = vote.proposal_id;
288 let transition = self
289 .update_session(scope, proposal_id, move |session| session.add_vote(vote))
290 .await?;
291 self.handle_transition(scope, proposal_id, transition);
292 Ok(())
293 }
294
295 pub async fn handle_consensus_timeout(
313 &self,
314 scope: &Scope,
315 proposal_id: u32,
316 ) -> Result<bool, ConsensusError> {
317 let timeout_result: Result<Option<bool>, ConsensusError> = self
318 .update_session(scope, proposal_id, |session| {
319 if let ConsensusState::ConsensusReached(result) = session.state {
320 return Ok(Some(result));
321 }
322 let result = calculate_consensus_result(
323 &session.votes,
324 session.proposal.expected_voters_count,
325 session.config.consensus_threshold(),
326 session.proposal.liveness_criteria_yes,
327 true,
328 );
329 if let Some(result) = result {
330 session.state = ConsensusState::ConsensusReached(result);
331 Ok(Some(result))
332 } else {
333 session.state = ConsensusState::Failed;
334 Ok(None)
335 }
336 })
337 .await;
338
339 match timeout_result? {
340 Some(consensus_result) => {
341 self.emit_event(
342 scope,
343 ConsensusEvent::ConsensusReached {
344 proposal_id,
345 result: consensus_result,
346 timestamp: current_timestamp()?,
347 },
348 );
349 Ok(consensus_result)
350 }
351 None => {
352 self.emit_event(
353 scope,
354 ConsensusEvent::ConsensusFailed {
355 proposal_id,
356 timestamp: current_timestamp()?,
357 },
358 );
359 Err(ConsensusError::InsufficientVotesAtTimeout)
360 }
361 }
362 }
363
364 pub async fn scope(
405 &self,
406 scope: &Scope,
407 ) -> Result<ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>, ConsensusError> {
408 let existing_config = self.storage.get_scope_config(scope).await?;
409 let builder = if let Some(config) = existing_config {
410 ScopeConfigBuilder::from_existing(config)
411 } else {
412 ScopeConfigBuilder::new()
413 };
414 Ok(ScopeConfigBuilderWrapper::new(
415 self.clone(),
416 scope.clone(),
417 builder,
418 ))
419 }
420
421 async fn initialize_scope(
422 &self,
423 scope: &Scope,
424 config: ScopeConfig,
425 ) -> Result<(), ConsensusError> {
426 config.validate()?;
427 self.storage.set_scope_config(scope, config).await
428 }
429
430 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
431 where
432 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
433 {
434 self.storage.update_scope_config(scope, updater).await
435 }
436
437 async fn resolve_config(
442 &self,
443 scope: &Scope,
444 proposal_override: Option<ConsensusConfig>,
445 proposal: Option<&Proposal>,
446 ) -> Result<ConsensusConfig, ConsensusError> {
447 let has_explicit_override = proposal_override.is_some();
451 let base_config = if let Some(override_config) = proposal_override {
452 override_config
453 } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
454 ConsensusConfig::from(scope_config)
455 } else {
456 ConsensusConfig::gossipsub()
457 };
458
459 if let Some(prop) = proposal {
461 let timeout_seconds = if has_explicit_override {
464 base_config.consensus_timeout()
465 } else if prop.expiration_timestamp > prop.timestamp {
466 Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
467 } else {
468 base_config.consensus_timeout()
469 };
470
471 Ok(ConsensusConfig::new(
472 base_config.consensus_threshold(),
473 timeout_seconds,
474 base_config.max_rounds(),
475 base_config.use_gossipsub_rounds(),
476 prop.liveness_criteria_yes,
477 ))
478 } else {
479 Ok(base_config)
480 }
481 }
482
483 async fn get_session(
484 &self,
485 scope: &Scope,
486 proposal_id: u32,
487 ) -> Result<ConsensusSession, ConsensusError> {
488 self.storage
489 .get_session(scope, proposal_id)
490 .await?
491 .ok_or(ConsensusError::SessionNotFound)
492 }
493
494 async fn update_session<R, F>(
495 &self,
496 scope: &Scope,
497 proposal_id: u32,
498 mutator: F,
499 ) -> Result<R, ConsensusError>
500 where
501 R: Send,
502 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
503 {
504 self.storage
505 .update_session(scope, proposal_id, mutator)
506 .await
507 }
508
509 async fn save_session(
510 &self,
511 scope: &Scope,
512 session: ConsensusSession,
513 ) -> Result<(), ConsensusError> {
514 self.storage.save_session(scope, session).await
515 }
516
517 async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
518 self.storage
519 .update_scope_sessions(scope, |sessions| {
520 if sessions.len() <= self.max_sessions_per_scope {
521 return Ok(());
522 }
523
524 sessions.sort_by_key(|s| std::cmp::Reverse(s.created_at));
525 sessions.truncate(self.max_sessions_per_scope);
526 Ok(())
527 })
528 .await
529 }
530
531 pub(crate) async fn list_scope_sessions(
532 &self,
533 scope: &Scope,
534 ) -> Result<Vec<ConsensusSession>, ConsensusError> {
535 self.storage
536 .list_scope_sessions(scope)
537 .await?
538 .ok_or(ConsensusError::ScopeNotFound)
539 }
540
541 fn handle_transition(&self, scope: &Scope, proposal_id: u32, transition: SessionTransition) {
542 if let SessionTransition::ConsensusReached(result) = transition {
543 self.emit_event(
544 scope,
545 ConsensusEvent::ConsensusReached {
546 proposal_id,
547 result,
548 timestamp: current_timestamp().unwrap_or(0),
549 },
550 );
551 }
552 }
553
554 fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
555 self.event_bus.publish(scope.clone(), event);
556 }
557}
558
559pub struct ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
561where
562 Scope: ConsensusScope,
563 Storage: ConsensusStorage<Scope>,
564 Event: ConsensusEventBus<Scope>,
565 Signer: ConsensusSignatureScheme,
566{
567 service: ConsensusService<Scope, Storage, Event, Signer>,
568 scope: Scope,
569 builder: ScopeConfigBuilder,
570}
571
572impl<Scope, Storage, Event, Signer> ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
573where
574 Scope: ConsensusScope,
575 Storage: ConsensusStorage<Scope>,
576 Event: ConsensusEventBus<Scope>,
577 Signer: ConsensusSignatureScheme,
578{
579 fn new(
580 service: ConsensusService<Scope, Storage, Event, Signer>,
581 scope: Scope,
582 builder: ScopeConfigBuilder,
583 ) -> Self {
584 Self {
585 service,
586 scope,
587 builder,
588 }
589 }
590
591 pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
593 self.builder = self.builder.with_network_type(network_type);
594 self
595 }
596
597 pub fn with_threshold(mut self, threshold: f64) -> Self {
599 self.builder = self.builder.with_threshold(threshold);
600 self
601 }
602
603 pub fn with_timeout(mut self, timeout: Duration) -> Self {
605 self.builder = self.builder.with_timeout(timeout);
606 self
607 }
608
609 pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
611 self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
612 self
613 }
614
615 pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
617 self.builder = self.builder.with_max_rounds(max_rounds);
618 self
619 }
620
621 pub fn p2p_preset(mut self) -> Self {
623 self.builder = self.builder.p2p_preset();
624 self
625 }
626
627 pub fn gossipsub_preset(mut self) -> Self {
629 self.builder = self.builder.gossipsub_preset();
630 self
631 }
632
633 pub fn strict_consensus(mut self) -> Self {
635 self.builder = self.builder.strict_consensus();
636 self
637 }
638
639 pub fn fast_consensus(mut self) -> Self {
641 self.builder = self.builder.fast_consensus();
642 self
643 }
644
645 pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
647 self.builder = self.builder.with_network_defaults(network_type);
648 self
649 }
650
651 pub async fn initialize(self) -> Result<(), ConsensusError> {
653 let config = self.builder.build()?;
654 self.service.initialize_scope(&self.scope, config).await
655 }
656
657 pub async fn update(self) -> Result<(), ConsensusError> {
659 let config = self.builder.build()?;
660 self.service
661 .update_scope_config(&self.scope, |existing| {
662 *existing = config;
663 Ok(())
664 })
665 .await
666 }
667
668 pub fn get_config(&self) -> ScopeConfig {
670 self.builder.get_config()
671 }
672}