use std::marker::PhantomData;
use tokio::time::Duration;
use crate::{
error::ConsensusError,
events::{BroadcastEventBus, ConsensusEventBus},
protos::consensus::v1::{Proposal, Vote},
scope::{ConsensusScope, ScopeID},
scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
session::{ConsensusConfig, ConsensusSession, ConsensusState},
signing::{ConsensusSignatureScheme, EthereumConsensusSigner},
storage::{ConsensusStorage, InMemoryConsensusStorage},
types::{ConsensusEvent, CreateProposalRequest, SessionTransition},
utils::{
build_vote, calculate_consensus_result, current_timestamp, validate_proposal_timestamp,
validate_vote,
},
};
pub struct ConsensusService<Scope, Storage, Event, Signer>
where
Scope: ConsensusScope,
Storage: ConsensusStorage<Scope>,
Event: ConsensusEventBus<Scope>,
Signer: ConsensusSignatureScheme,
{
storage: Storage,
max_sessions_per_scope: usize,
event_bus: Event,
signer: Signer,
_scope: PhantomData<Scope>,
}
impl<Scope, Storage, Event, Signer> Clone for ConsensusService<Scope, Storage, Event, Signer>
where
Scope: ConsensusScope,
Storage: ConsensusStorage<Scope>,
Event: ConsensusEventBus<Scope>,
Signer: ConsensusSignatureScheme,
{
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
max_sessions_per_scope: self.max_sessions_per_scope,
event_bus: self.event_bus.clone(),
signer: self.signer.clone(),
_scope: PhantomData,
}
}
}
pub type DefaultConsensusService = ConsensusService<
ScopeID,
InMemoryConsensusStorage<ScopeID>,
BroadcastEventBus<ScopeID>,
EthereumConsensusSigner,
>;
impl DefaultConsensusService {
pub fn new(signer: EthereumConsensusSigner) -> Self {
Self::new_with_max_sessions(signer, 10)
}
pub fn new_with_max_sessions(
signer: EthereumConsensusSigner,
max_sessions_per_scope: usize,
) -> Self {
Self::new_with_components(
InMemoryConsensusStorage::new(),
BroadcastEventBus::default(),
signer,
max_sessions_per_scope,
)
}
}
impl<Scope, Storage, Event, Signer> ConsensusService<Scope, Storage, Event, Signer>
where
Scope: ConsensusScope,
Storage: ConsensusStorage<Scope>,
Event: ConsensusEventBus<Scope>,
Signer: ConsensusSignatureScheme,
{
pub fn new_with_components(
storage: Storage,
event_bus: Event,
signer: Signer,
max_sessions_per_scope: usize,
) -> Self {
Self {
storage,
max_sessions_per_scope,
event_bus,
signer,
_scope: PhantomData,
}
}
pub fn storage(&self) -> &Storage {
&self.storage
}
pub fn event_bus(&self) -> &Event {
&self.event_bus
}
pub fn signer(&self) -> &Signer {
&self.signer
}
pub async fn create_proposal(
&self,
scope: &Scope,
request: CreateProposalRequest,
) -> Result<Proposal, ConsensusError> {
self.create_proposal_with_config(scope, request, None).await
}
pub async fn create_proposal_with_config(
&self,
scope: &Scope,
request: CreateProposalRequest,
config: Option<ConsensusConfig>,
) -> Result<Proposal, ConsensusError> {
let proposal = request.into_proposal()?;
let config = self.resolve_config(scope, config, Some(&proposal)).await?;
let (session, _) =
ConsensusSession::from_proposal::<Signer>(proposal.clone(), config.clone())?;
self.save_session(scope, session).await?;
self.trim_scope_sessions(scope).await?;
Ok(proposal)
}
pub async fn cast_vote(
&self,
scope: &Scope,
proposal_id: u32,
choice: bool,
) -> Result<Vote, ConsensusError> {
let session = self.get_session(scope, proposal_id).await?;
validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
if session.votes.contains_key(self.signer.identity()) {
return Err(ConsensusError::UserAlreadyVoted);
}
let vote = build_vote(&session.proposal, choice, &self.signer).await?;
let vote_clone = vote.clone();
let transition = self
.update_session(scope, proposal_id, move |session| {
session.add_vote(vote_clone)
})
.await?;
self.handle_transition(scope, proposal_id, transition);
Ok(vote)
}
pub async fn cast_vote_and_get_proposal(
&self,
scope: &Scope,
proposal_id: u32,
choice: bool,
) -> Result<Proposal, ConsensusError> {
self.cast_vote(scope, proposal_id, choice).await?;
let session = self.get_session(scope, proposal_id).await?;
Ok(session.proposal)
}
pub async fn process_incoming_proposal(
&self,
scope: &Scope,
proposal: Proposal,
) -> Result<(), ConsensusError> {
if self.get_session(scope, proposal.proposal_id).await.is_ok() {
return Err(ConsensusError::ProposalAlreadyExist);
}
let config = self.resolve_config(scope, None, Some(&proposal)).await?;
let (session, transition) = ConsensusSession::from_proposal::<Signer>(proposal, config)?;
self.handle_transition(scope, session.proposal.proposal_id, transition);
self.save_session(scope, session).await?;
self.trim_scope_sessions(scope).await?;
Ok(())
}
pub async fn process_incoming_vote(
&self,
scope: &Scope,
vote: Vote,
) -> Result<(), ConsensusError> {
let session = self.get_session(scope, vote.proposal_id).await?;
validate_vote::<Signer>(
&vote,
session.proposal.expiration_timestamp,
session.proposal.timestamp,
)?;
let proposal_id = vote.proposal_id;
let transition = self
.update_session(scope, proposal_id, move |session| session.add_vote(vote))
.await?;
self.handle_transition(scope, proposal_id, transition);
Ok(())
}
pub async fn handle_consensus_timeout(
&self,
scope: &Scope,
proposal_id: u32,
) -> Result<bool, ConsensusError> {
let timeout_result: Result<Option<bool>, ConsensusError> = self
.update_session(scope, proposal_id, |session| {
if let ConsensusState::ConsensusReached(result) = session.state {
return Ok(Some(result));
}
let result = calculate_consensus_result(
&session.votes,
session.proposal.expected_voters_count,
session.config.consensus_threshold(),
session.proposal.liveness_criteria_yes,
true,
);
if let Some(result) = result {
session.state = ConsensusState::ConsensusReached(result);
Ok(Some(result))
} else {
session.state = ConsensusState::Failed;
Ok(None)
}
})
.await;
match timeout_result? {
Some(consensus_result) => {
self.emit_event(
scope,
ConsensusEvent::ConsensusReached {
proposal_id,
result: consensus_result,
timestamp: current_timestamp()?,
},
);
Ok(consensus_result)
}
None => {
self.emit_event(
scope,
ConsensusEvent::ConsensusFailed {
proposal_id,
timestamp: current_timestamp()?,
},
);
Err(ConsensusError::InsufficientVotesAtTimeout)
}
}
}
pub async fn scope(
&self,
scope: &Scope,
) -> Result<ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>, ConsensusError> {
let existing_config = self.storage.get_scope_config(scope).await?;
let builder = if let Some(config) = existing_config {
ScopeConfigBuilder::from_existing(config)
} else {
ScopeConfigBuilder::new()
};
Ok(ScopeConfigBuilderWrapper::new(
self.clone(),
scope.clone(),
builder,
))
}
async fn initialize_scope(
&self,
scope: &Scope,
config: ScopeConfig,
) -> Result<(), ConsensusError> {
config.validate()?;
self.storage.set_scope_config(scope, config).await
}
async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
where
F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
{
self.storage.update_scope_config(scope, updater).await
}
async fn resolve_config(
&self,
scope: &Scope,
proposal_override: Option<ConsensusConfig>,
proposal: Option<&Proposal>,
) -> Result<ConsensusConfig, ConsensusError> {
let has_explicit_override = proposal_override.is_some();
let base_config = if let Some(override_config) = proposal_override {
override_config
} else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
ConsensusConfig::from(scope_config)
} else {
ConsensusConfig::gossipsub()
};
if let Some(prop) = proposal {
let timeout_seconds = if has_explicit_override {
base_config.consensus_timeout()
} else if prop.expiration_timestamp > prop.timestamp {
Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
} else {
base_config.consensus_timeout()
};
Ok(ConsensusConfig::new(
base_config.consensus_threshold(),
timeout_seconds,
base_config.max_rounds(),
base_config.use_gossipsub_rounds(),
prop.liveness_criteria_yes,
))
} else {
Ok(base_config)
}
}
async fn get_session(
&self,
scope: &Scope,
proposal_id: u32,
) -> Result<ConsensusSession, ConsensusError> {
self.storage
.get_session(scope, proposal_id)
.await?
.ok_or(ConsensusError::SessionNotFound)
}
async fn update_session<R, F>(
&self,
scope: &Scope,
proposal_id: u32,
mutator: F,
) -> Result<R, ConsensusError>
where
R: Send,
F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
{
self.storage
.update_session(scope, proposal_id, mutator)
.await
}
async fn save_session(
&self,
scope: &Scope,
session: ConsensusSession,
) -> Result<(), ConsensusError> {
self.storage.save_session(scope, session).await
}
async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
self.storage
.update_scope_sessions(scope, |sessions| {
if sessions.len() <= self.max_sessions_per_scope {
return Ok(());
}
sessions.sort_by_key(|s| std::cmp::Reverse(s.created_at));
sessions.truncate(self.max_sessions_per_scope);
Ok(())
})
.await
}
pub(crate) async fn list_scope_sessions(
&self,
scope: &Scope,
) -> Result<Vec<ConsensusSession>, ConsensusError> {
self.storage
.list_scope_sessions(scope)
.await?
.ok_or(ConsensusError::ScopeNotFound)
}
fn handle_transition(&self, scope: &Scope, proposal_id: u32, transition: SessionTransition) {
if let SessionTransition::ConsensusReached(result) = transition {
self.emit_event(
scope,
ConsensusEvent::ConsensusReached {
proposal_id,
result,
timestamp: current_timestamp().unwrap_or(0),
},
);
}
}
fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
self.event_bus.publish(scope.clone(), event);
}
}
pub struct ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
where
Scope: ConsensusScope,
Storage: ConsensusStorage<Scope>,
Event: ConsensusEventBus<Scope>,
Signer: ConsensusSignatureScheme,
{
service: ConsensusService<Scope, Storage, Event, Signer>,
scope: Scope,
builder: ScopeConfigBuilder,
}
impl<Scope, Storage, Event, Signer> ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
where
Scope: ConsensusScope,
Storage: ConsensusStorage<Scope>,
Event: ConsensusEventBus<Scope>,
Signer: ConsensusSignatureScheme,
{
fn new(
service: ConsensusService<Scope, Storage, Event, Signer>,
scope: Scope,
builder: ScopeConfigBuilder,
) -> Self {
Self {
service,
scope,
builder,
}
}
pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
self.builder = self.builder.with_network_type(network_type);
self
}
pub fn with_threshold(mut self, threshold: f64) -> Self {
self.builder = self.builder.with_threshold(threshold);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.builder = self.builder.with_timeout(timeout);
self
}
pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
self
}
pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
self.builder = self.builder.with_max_rounds(max_rounds);
self
}
pub fn p2p_preset(mut self) -> Self {
self.builder = self.builder.p2p_preset();
self
}
pub fn gossipsub_preset(mut self) -> Self {
self.builder = self.builder.gossipsub_preset();
self
}
pub fn strict_consensus(mut self) -> Self {
self.builder = self.builder.strict_consensus();
self
}
pub fn fast_consensus(mut self) -> Self {
self.builder = self.builder.fast_consensus();
self
}
pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
self.builder = self.builder.with_network_defaults(network_type);
self
}
pub async fn initialize(self) -> Result<(), ConsensusError> {
let config = self.builder.build()?;
self.service.initialize_scope(&self.scope, config).await
}
pub async fn update(self) -> Result<(), ConsensusError> {
let config = self.builder.build()?;
self.service
.update_scope_config(&self.scope, |existing| {
*existing = config;
Ok(())
})
.await
}
pub fn get_config(&self) -> ScopeConfig {
self.builder.get_config()
}
}