1use std::marker::PhantomData;
2
3use alloy_signer::Signer;
4use tokio::time::Duration;
5
6use crate::{
7 error::ConsensusError,
8 events::{BroadcastEventBus, ConsensusEventBus},
9 protos::consensus::v1::{Proposal, Vote},
10 scope::{ConsensusScope, ScopeID},
11 scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
12 session::{ConsensusConfig, ConsensusSession, ConsensusState},
13 storage::{ConsensusStorage, InMemoryConsensusStorage},
14 types::{ConsensusEvent, CreateProposalRequest, SessionTransition},
15 utils::{
16 build_vote, calculate_consensus_result, current_timestamp, validate_proposal_timestamp,
17 validate_vote,
18 },
19};
20pub struct ConsensusService<Scope, S, E>
25where
26 Scope: ConsensusScope,
27 S: ConsensusStorage<Scope>,
28 E: ConsensusEventBus<Scope>,
29{
30 storage: S,
31 max_sessions_per_scope: usize,
32 event_bus: E,
33 _scope: PhantomData<Scope>,
34}
35
36impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
37where
38 Scope: ConsensusScope,
39 S: ConsensusStorage<Scope>,
40 E: ConsensusEventBus<Scope>,
41{
42 fn clone(&self) -> Self {
43 Self {
44 storage: self.storage.clone(),
45 max_sessions_per_scope: self.max_sessions_per_scope,
46 event_bus: self.event_bus.clone(),
47 _scope: PhantomData,
48 }
49 }
50}
51
52pub type DefaultConsensusService =
58 ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
59
60impl DefaultConsensusService {
61 fn new() -> Self {
63 Self::new_with_max_sessions(10)
64 }
65
66 pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
72 Self::new_with_components(
73 InMemoryConsensusStorage::new(),
74 BroadcastEventBus::default(),
75 max_sessions_per_scope,
76 )
77 }
78}
79
80impl Default for DefaultConsensusService {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl<Scope, S, E> ConsensusService<Scope, S, E>
87where
88 Scope: ConsensusScope,
89 S: ConsensusStorage<Scope>,
90 E: ConsensusEventBus<Scope>,
91{
92 pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
98 Self {
99 storage,
100 max_sessions_per_scope,
101 event_bus,
102 _scope: PhantomData,
103 }
104 }
105
106 pub fn storage(&self) -> &S {
113 &self.storage
114 }
115
116 pub fn event_bus(&self) -> &E {
120 &self.event_bus
121 }
122
123 pub async fn create_proposal(
140 &self,
141 scope: &Scope,
142 request: CreateProposalRequest,
143 ) -> Result<Proposal, ConsensusError> {
144 self.create_proposal_with_config(scope, request, None).await
145 }
146
147 pub async fn create_proposal_with_config(
151 &self,
152 scope: &Scope,
153 request: CreateProposalRequest,
154 config: Option<ConsensusConfig>,
155 ) -> Result<Proposal, ConsensusError> {
156 let proposal = request.into_proposal()?;
157 let config = self.resolve_config(scope, config, Some(&proposal)).await?;
158 let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
159 self.save_session(scope, session).await?;
160 self.trim_scope_sessions(scope).await?;
161 Ok(proposal)
162 }
163
164 pub async fn cast_vote<SN: Signer + Sync + Send>(
170 &self,
171 scope: &Scope,
172 proposal_id: u32,
173 choice: bool,
174 signer: SN,
175 ) -> Result<Vote, ConsensusError> {
176 let session = self.get_session(scope, proposal_id).await?;
177 validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
178
179 let voter_address = signer.address().as_slice().to_vec();
180 if session.votes.contains_key(&voter_address) {
181 return Err(ConsensusError::UserAlreadyVoted);
182 }
183
184 let vote = build_vote(&session.proposal, choice, signer).await?;
185 let vote_clone = vote.clone();
186 let transition = self
187 .update_session(scope, proposal_id, move |session| {
188 session.add_vote(vote_clone)
189 })
190 .await?;
191 self.handle_transition(scope, proposal_id, transition);
192 Ok(vote)
193 }
194
195 pub async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
200 &self,
201 scope: &Scope,
202 proposal_id: u32,
203 choice: bool,
204 signer: SN,
205 ) -> Result<Proposal, ConsensusError> {
206 self.cast_vote(scope, proposal_id, choice, signer).await?;
207 let session = self.get_session(scope, proposal_id).await?;
208 Ok(session.proposal)
209 }
210
211 pub async fn process_incoming_proposal(
220 &self,
221 scope: &Scope,
222 proposal: Proposal,
223 ) -> Result<(), ConsensusError> {
224 if self.get_session(scope, proposal.proposal_id).await.is_ok() {
225 return Err(ConsensusError::ProposalAlreadyExist);
226 }
227 let config = self.resolve_config(scope, None, Some(&proposal)).await?;
228 let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
229 self.handle_transition(scope, session.proposal.proposal_id, transition);
230 self.save_session(scope, session).await?;
231 self.trim_scope_sessions(scope).await?;
232 Ok(())
233 }
234
235 pub async fn process_incoming_vote(
241 &self,
242 scope: &Scope,
243 vote: Vote,
244 ) -> Result<(), ConsensusError> {
245 let session = self.get_session(scope, vote.proposal_id).await?;
246 validate_vote(
247 &vote,
248 session.proposal.expiration_timestamp,
249 session.proposal.timestamp,
250 )?;
251 let proposal_id = vote.proposal_id;
252 let transition = self
253 .update_session(scope, proposal_id, move |session| session.add_vote(vote))
254 .await?;
255 self.handle_transition(scope, proposal_id, transition);
256 Ok(())
257 }
258
259 pub async fn handle_consensus_timeout(
277 &self,
278 scope: &Scope,
279 proposal_id: u32,
280 ) -> Result<bool, ConsensusError> {
281 let timeout_result: Result<Option<bool>, ConsensusError> = self
282 .update_session(scope, proposal_id, |session| {
283 if let ConsensusState::ConsensusReached(result) = session.state {
284 return Ok(Some(result));
285 }
286 let result = calculate_consensus_result(
287 &session.votes,
288 session.proposal.expected_voters_count,
289 session.config.consensus_threshold(),
290 session.proposal.liveness_criteria_yes,
291 true,
292 );
293 if let Some(result) = result {
294 session.state = ConsensusState::ConsensusReached(result);
295 Ok(Some(result))
296 } else {
297 session.state = ConsensusState::Failed;
298 Ok(None)
299 }
300 })
301 .await;
302
303 match timeout_result? {
304 Some(consensus_result) => {
305 self.emit_event(
306 scope,
307 ConsensusEvent::ConsensusReached {
308 proposal_id,
309 result: consensus_result,
310 timestamp: current_timestamp()?,
311 },
312 );
313 Ok(consensus_result)
314 }
315 None => {
316 self.emit_event(
317 scope,
318 ConsensusEvent::ConsensusFailed {
319 proposal_id,
320 timestamp: current_timestamp()?,
321 },
322 );
323 Err(ConsensusError::InsufficientVotesAtTimeout)
324 }
325 }
326 }
327
328 pub async fn scope(
362 &self,
363 scope: &Scope,
364 ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
365 let existing_config = self.storage.get_scope_config(scope).await?;
366 let builder = if let Some(config) = existing_config {
367 ScopeConfigBuilder::from_existing(config)
368 } else {
369 ScopeConfigBuilder::new()
370 };
371 Ok(ScopeConfigBuilderWrapper::new(
372 self.clone(),
373 scope.clone(),
374 builder,
375 ))
376 }
377
378 async fn initialize_scope(
379 &self,
380 scope: &Scope,
381 config: ScopeConfig,
382 ) -> Result<(), ConsensusError> {
383 config.validate()?;
384 self.storage.set_scope_config(scope, config).await
385 }
386
387 async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
388 where
389 F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
390 {
391 self.storage.update_scope_config(scope, updater).await
392 }
393
394 async fn resolve_config(
399 &self,
400 scope: &Scope,
401 proposal_override: Option<ConsensusConfig>,
402 proposal: Option<&Proposal>,
403 ) -> Result<ConsensusConfig, ConsensusError> {
404 let has_explicit_override = proposal_override.is_some();
408 let base_config = if let Some(override_config) = proposal_override {
409 override_config
410 } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
411 ConsensusConfig::from(scope_config)
412 } else {
413 ConsensusConfig::gossipsub()
414 };
415
416 if let Some(prop) = proposal {
418 let timeout_seconds = if has_explicit_override {
421 base_config.consensus_timeout()
422 } else if prop.expiration_timestamp > prop.timestamp {
423 Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
424 } else {
425 base_config.consensus_timeout()
426 };
427
428 Ok(ConsensusConfig::new(
429 base_config.consensus_threshold(),
430 timeout_seconds,
431 base_config.max_rounds(),
432 base_config.use_gossipsub_rounds(),
433 prop.liveness_criteria_yes,
434 ))
435 } else {
436 Ok(base_config)
437 }
438 }
439
440 async fn get_session(
441 &self,
442 scope: &Scope,
443 proposal_id: u32,
444 ) -> Result<ConsensusSession, ConsensusError> {
445 self.storage
446 .get_session(scope, proposal_id)
447 .await?
448 .ok_or(ConsensusError::SessionNotFound)
449 }
450
451 async fn update_session<R, F>(
452 &self,
453 scope: &Scope,
454 proposal_id: u32,
455 mutator: F,
456 ) -> Result<R, ConsensusError>
457 where
458 R: Send,
459 F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
460 {
461 self.storage
462 .update_session(scope, proposal_id, mutator)
463 .await
464 }
465
466 async fn save_session(
467 &self,
468 scope: &Scope,
469 session: ConsensusSession,
470 ) -> Result<(), ConsensusError> {
471 self.storage.save_session(scope, session).await
472 }
473
474 async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
475 self.storage
476 .update_scope_sessions(scope, |sessions| {
477 if sessions.len() <= self.max_sessions_per_scope {
478 return Ok(());
479 }
480
481 sessions.sort_by_key(|s| std::cmp::Reverse(s.created_at));
482 sessions.truncate(self.max_sessions_per_scope);
483 Ok(())
484 })
485 .await
486 }
487
488 pub(crate) async fn list_scope_sessions(
489 &self,
490 scope: &Scope,
491 ) -> Result<Vec<ConsensusSession>, ConsensusError> {
492 self.storage
493 .list_scope_sessions(scope)
494 .await?
495 .ok_or(ConsensusError::ScopeNotFound)
496 }
497
498 fn handle_transition(&self, scope: &Scope, proposal_id: u32, transition: SessionTransition) {
499 if let SessionTransition::ConsensusReached(result) = transition {
500 self.emit_event(
501 scope,
502 ConsensusEvent::ConsensusReached {
503 proposal_id,
504 result,
505 timestamp: current_timestamp().unwrap_or(0),
506 },
507 );
508 }
509 }
510
511 fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
512 self.event_bus.publish(scope.clone(), event);
513 }
514}
515
516pub struct ScopeConfigBuilderWrapper<Scope, S, E>
518where
519 Scope: ConsensusScope,
520 S: ConsensusStorage<Scope>,
521 E: ConsensusEventBus<Scope>,
522{
523 service: ConsensusService<Scope, S, E>,
524 scope: Scope,
525 builder: ScopeConfigBuilder,
526}
527
528impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
529where
530 Scope: ConsensusScope,
531 S: ConsensusStorage<Scope>,
532 E: ConsensusEventBus<Scope>,
533{
534 fn new(
535 service: ConsensusService<Scope, S, E>,
536 scope: Scope,
537 builder: ScopeConfigBuilder,
538 ) -> Self {
539 Self {
540 service,
541 scope,
542 builder,
543 }
544 }
545
546 pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
548 self.builder = self.builder.with_network_type(network_type);
549 self
550 }
551
552 pub fn with_threshold(mut self, threshold: f64) -> Self {
554 self.builder = self.builder.with_threshold(threshold);
555 self
556 }
557
558 pub fn with_timeout(mut self, timeout: Duration) -> Self {
560 self.builder = self.builder.with_timeout(timeout);
561 self
562 }
563
564 pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
566 self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
567 self
568 }
569
570 pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
572 self.builder = self.builder.with_max_rounds(max_rounds);
573 self
574 }
575
576 pub fn p2p_preset(mut self) -> Self {
578 self.builder = self.builder.p2p_preset();
579 self
580 }
581
582 pub fn gossipsub_preset(mut self) -> Self {
584 self.builder = self.builder.gossipsub_preset();
585 self
586 }
587
588 pub fn strict_consensus(mut self) -> Self {
590 self.builder = self.builder.strict_consensus();
591 self
592 }
593
594 pub fn fast_consensus(mut self) -> Self {
596 self.builder = self.builder.fast_consensus();
597 self
598 }
599
600 pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
602 self.builder = self.builder.with_network_defaults(network_type);
603 self
604 }
605
606 pub async fn initialize(self) -> Result<(), ConsensusError> {
608 let config = self.builder.build()?;
609 self.service.initialize_scope(&self.scope, config).await
610 }
611
612 pub async fn update(self) -> Result<(), ConsensusError> {
614 let config = self.builder.build()?;
615 self.service
616 .update_scope_config(&self.scope, |existing| {
617 *existing = config;
618 Ok(())
619 })
620 .await
621 }
622
623 pub fn get_config(&self) -> ScopeConfig {
625 self.builder.get_config()
626 }
627}