use super::{StateHandler, ValidatorContext, ValidatorState, idle::Idle};
use crate::{
BatchCommitmentValidationReply, CommitmentSubmitted, ConsensusEvent,
utils::MultisignedBatchCommitment,
};
use anyhow::{Context as _, Result, anyhow, ensure};
use derive_more::Display;
use ethexe_common::{
Address, SimpleBlockData, ToDigest, ValidatorsVec, consensus::BatchCommitmentValidationRequest,
gear::BatchCommitment, network::ValidatorMessage,
};
use futures::{FutureExt, future::BoxFuture};
use gsigner::secp256k1::Secp256k1SignerExt;
use std::{
collections::BTreeSet,
task::{Context, Poll},
};
use tokio::time::sleep;
#[derive(Display)]
#[display("COORDINATOR_BOOT")]
pub struct CoordinatorBoot {
ctx: ValidatorContext,
block: SimpleBlockData,
validators: ValidatorsVec,
pending: Option<BoxFuture<'static, Result<Option<BatchCommitment>>>>,
}
impl std::fmt::Debug for CoordinatorBoot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoordinatorBoot")
.field("block", &self.block.hash)
.finish_non_exhaustive()
}
}
impl CoordinatorBoot {
pub fn start(
ctx: ValidatorContext,
block: SimpleBlockData,
validators: ValidatorsVec,
) -> Result<ValidatorState> {
let delay = ctx.core.coordinator_aggregation_delay;
let batch_manager = ctx.core.batch_manager.clone();
let pending = async move {
sleep(delay).await;
batch_manager.create_batch_commitment(block).await
}
.boxed();
Ok(Self {
ctx,
block,
validators,
pending: Some(pending),
}
.into())
}
}
impl StateHandler for CoordinatorBoot {
fn context(&self) -> &ValidatorContext {
&self.ctx
}
fn context_mut(&mut self) -> &mut ValidatorContext {
&mut self.ctx
}
fn into_context(self) -> ValidatorContext {
self.ctx
}
fn poll_next_state(mut self, cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> {
let Some(future) = self.pending.as_mut() else {
return Ok((Poll::Pending, self.into()));
};
match future.poll_unpin(cx) {
Poll::Pending => Ok((Poll::Pending, self.into())),
Poll::Ready(Err(err)) => Err(err),
Poll::Ready(Ok(None)) => {
tracing::debug!(
block = %self.block.hash,
"coordinator skipped batch: no commitments to submit"
);
let next = Idle::create(self.ctx)?;
Ok((Poll::Ready(()), next))
}
Poll::Ready(Ok(Some(batch))) => {
let next = Coordinator::create(self.ctx, self.validators, batch, self.block)?;
Ok((Poll::Ready(()), next))
}
}
}
}
#[derive(Debug, Display)]
#[display("COORDINATOR")]
pub struct Coordinator {
ctx: ValidatorContext,
validators: BTreeSet<Address>,
multisigned_batch: MultisignedBatchCommitment,
}
impl StateHandler for Coordinator {
fn context(&self) -> &ValidatorContext {
&self.ctx
}
fn context_mut(&mut self) -> &mut ValidatorContext {
&mut self.ctx
}
fn into_context(self) -> ValidatorContext {
self.ctx
}
fn process_validation_reply(
mut self,
reply: BatchCommitmentValidationReply,
) -> Result<ValidatorState> {
if let Err(err) = self
.multisigned_batch
.accept_batch_commitment_validation_reply(reply, |addr| {
self.validators
.contains(&addr)
.then_some(())
.ok_or_else(|| anyhow!("Received validation reply is not known validator"))
})
{
self.warning(format!("validation reply rejected: {err}"));
}
if self.multisigned_batch.signatures().len() as u64 >= self.ctx.core.signatures_threshold {
Self::submission(self.ctx, self.multisigned_batch)
} else {
Ok(self.into())
}
}
}
impl Coordinator {
pub fn create(
mut ctx: ValidatorContext,
validators: ValidatorsVec,
batch: BatchCommitment,
block: SimpleBlockData,
) -> Result<ValidatorState> {
debug_assert_eq!(batch.block_hash, block.hash, "Block hash mismatch");
ensure!(
validators.len() as u64 >= ctx.core.signatures_threshold,
"Number of validators is less than threshold"
);
ensure!(
ctx.core.signatures_threshold > 0,
"Threshold should be greater than 0"
);
let multisigned_batch = MultisignedBatchCommitment::new(
batch,
&ctx.core.signer,
ctx.core.router_address,
ctx.core.pub_key,
)?;
ctx.core
.metrics
.last_signed_commitment_block_number
.set(block.header.height);
if multisigned_batch.signatures().len() as u64 >= ctx.core.signatures_threshold {
return Self::submission(ctx, multisigned_batch);
}
let era_index = ctx
.core
.timelines
.era_from_ts(multisigned_batch.batch().timestamp)
.context("failed to calculate era from batch timestamp")?;
let payload = BatchCommitmentValidationRequest::new(multisigned_batch.batch());
let message = ValidatorMessage { era_index, payload };
let validation_request = ctx
.core
.signer
.signed_data(ctx.core.pub_key, message, None)?;
ctx.output(ConsensusEvent::PublishMessage(validation_request.into()));
Ok(Self {
ctx,
validators: validators.into_iter().collect(),
multisigned_batch,
}
.into())
}
pub fn submission(
ctx: ValidatorContext,
multisigned_batch: MultisignedBatchCommitment,
) -> Result<ValidatorState> {
let (batch, signatures) = multisigned_batch.into_parts();
let cloned_committer = ctx.core.committer.clone_boxed();
ctx.tasks.push(
async move {
let block_hash = batch.block_hash;
let batch_digest = batch.to_digest();
let event = match cloned_committer.commit(batch, signatures).await {
Ok(tx) => CommitmentSubmitted {
block_hash,
batch_digest,
tx,
}.into(),
Err(err) => ConsensusEvent::Warning(format!(
"Failed to submit commitment for block {block_hash}, digest {batch_digest}: {err}"
))
};
Ok(event)
}
.boxed(),
);
Idle::create(ctx)
}
}