use alloy::{
primitives::{keccak256, Bytes, U256},
sol_types::SolValue,
};
use async_trait::async_trait;
use eigensdk::types::operator::OperatorId;
use futures_util::{
stream::{FuturesUnordered, StreamExt},
FutureExt,
};
use newton_prover_core::state_commit_registry::IStateRootCommittable::StateCommit;
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use newton_prover_chainio::avs::writer::AvsWriter;
use newton_prover_core::pcr0_provider::Pcr0Provider;
use crate::state_commit::{
aggregator::{AggregateRequest, OperatorRecord, StateCommitAggregator},
error::{from_chainio, StateCommitError},
operator_client::{OperatorClientError, StateCommitOperatorClient},
operator_set_reader::{OperatorSetSnapshot, OperatorSetSnapshotReader},
proposal::build_state_commit,
registry_view::{RegistryReader, RegistryView},
};
#[async_trait]
pub trait StateCommitWriter: Send + Sync + 'static {
async fn commit_state_root(&self, commit: StateCommit, bls_certificate: Bytes) -> Result<(), StateCommitError>;
}
#[derive(Debug)]
pub struct AvsWriterCommitter(pub Arc<AvsWriter>);
#[async_trait]
impl StateCommitWriter for AvsWriterCommitter {
async fn commit_state_root(&self, commit: StateCommit, bls_certificate: Bytes) -> Result<(), StateCommitError> {
self.0
.commit_state_root(commit, bls_certificate)
.await
.map(|_receipt| ())
.map_err(from_chainio)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct MajorityProposal {
new_state_root: alloy::primitives::B256,
da_cert_hash: alloy::primitives::B256,
}
#[derive(Clone)]
pub struct OperatorEntry {
pub operator_id: OperatorId,
pub stake: alloy::primitives::U256,
pub g1_pubkey: eigensdk::crypto_bls::BlsG1Point,
pub g2_pubkey: eigensdk::crypto_bls::BlsG2Point,
}
impl std::fmt::Debug for OperatorEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OperatorEntry")
.field("operator_id", &hex::encode(self.operator_id))
.field("stake", &self.stake)
.finish()
}
}
pub struct StateCommitOrchestrator {
chain_id: u64,
interval: Duration,
registry_reader: Arc<dyn RegistryReader>,
pcr0_provider: Arc<dyn Pcr0Provider>,
operator_client: Arc<dyn StateCommitOperatorClient>,
aggregator: Arc<StateCommitAggregator>,
writer: Arc<dyn StateCommitWriter>,
operator_set_reader: Arc<dyn OperatorSetSnapshotReader>,
operator_table_provider: Arc<dyn crate::state_commit::operator_table::OperatorTableProvider>,
operator_set: newton_prover_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet,
}
impl std::fmt::Debug for StateCommitOrchestrator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateCommitOrchestrator")
.field("chain_id", &self.chain_id)
.field("interval", &self.interval)
.finish()
}
}
impl StateCommitOrchestrator {
#[allow(clippy::too_many_arguments)]
pub fn new(
chain_id: u64,
interval: Duration,
registry_reader: Arc<dyn RegistryReader>,
pcr0_provider: Arc<dyn Pcr0Provider>,
operator_client: Arc<dyn StateCommitOperatorClient>,
aggregator: Arc<StateCommitAggregator>,
writer: Arc<dyn StateCommitWriter>,
operator_set_reader: Arc<dyn OperatorSetSnapshotReader>,
operator_table_provider: Arc<dyn crate::state_commit::operator_table::OperatorTableProvider>,
operator_set: newton_prover_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet,
) -> Self {
Self {
chain_id,
interval,
registry_reader,
pcr0_provider,
operator_client,
aggregator,
writer,
operator_set_reader,
operator_table_provider,
operator_set,
}
}
pub async fn run(self: Arc<Self>, shutdown: CancellationToken) {
let mut ticker = tokio::time::interval(self.interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
_ = shutdown.cancelled() => {
info!(chain_id = self.chain_id, "state-commit orchestrator shutting down");
return;
}
_ = ticker.tick() => {
let this = Arc::clone(&self);
let result = std::panic::AssertUnwindSafe(this.tick())
.catch_unwind()
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(ref e)) if e.is_poison() => {
warn!(
chain_id = self.chain_id,
error = %e,
"state-commit tick aborted on poison error; next tick will re-read view"
);
}
Ok(Err(e)) => {
error!(chain_id = self.chain_id, error = %e, "state-commit tick failed");
}
Err(_) => {
error!(chain_id = self.chain_id, "state-commit tick panicked; loop continues");
}
}
}
}
}
}
#[allow(clippy::result_large_err)]
async fn tick(&self) -> Result<(), StateCommitError> {
let view: RegistryView = self.registry_reader.read_view().await?;
let next_seq = view.sequence_no + 1;
let OperatorSetSnapshot {
operators,
reference_timestamp,
} = self.operator_set_reader.snapshot().await?;
debug!(
chain_id = self.chain_id,
sequence_no = view.sequence_no,
next_seq,
operator_count = operators.len(),
reference_timestamp,
"state-commit tick: view + operator-set read"
);
let majority = self.gather_majority_proposal(&operators, next_seq).await?;
let pcr0 = self
.pcr0_provider
.pcr0_commitment()
.await
.map_err(|e| StateCommitError::Pcr0Lookup(e.to_string()))?;
let now_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| StateCommitError::ClockSkew(e.to_string()))?
.as_secs();
let commit = build_state_commit(&view, majority.new_state_root, majority.da_cert_hash, pcr0, now_ts)?;
let digest = keccak256(commit.abi_encode());
let signatures = self
.commit_phase(&operators, digest, &commit, next_seq, reference_timestamp)
.await?;
let operator_records: Vec<OperatorRecord> = operators
.iter()
.map(|e| OperatorRecord {
operator_id: e.operator_id,
g1_pubkey: e.g1_pubkey.clone(),
g2_pubkey: e.g2_pubkey.clone(),
stake: e.stake,
})
.collect();
let operator_table_snapshot = match self
.operator_table_provider
.fetch_snapshot(self.operator_set.clone())
.await
{
Ok(snap) => Arc::new(snap),
Err(e) => {
warn!(
chain_id = self.chain_id,
sequence_no = next_seq,
error = %e,
"state-commit: operator-table snapshot fetch failed; aborting tick"
);
return Err(StateCommitError::AggregationFailed(format!(
"operator-table snapshot fetch: {e}"
)));
}
};
let cert_bytes = self.aggregator.aggregate(AggregateRequest {
digest,
signatures,
operator_set: operator_records,
reference_timestamp,
operator_table_snapshot,
})?;
self.writer.commit_state_root(commit.clone(), cert_bytes).await?;
info!(
chain_id = self.chain_id,
sequence_no = commit.sequenceNo,
new_state_root = %commit.newStateRoot,
"state commit submitted"
);
Ok(())
}
#[allow(clippy::result_large_err)]
async fn gather_majority_proposal(
&self,
operators: &[OperatorEntry],
next_seq: u64,
) -> Result<MajorityProposal, StateCommitError> {
let mut op_stake: HashMap<OperatorId, U256> = HashMap::with_capacity(operators.len());
let mut total_stake = U256::ZERO;
for entry in operators {
op_stake.insert(entry.operator_id, entry.stake);
total_stake += entry.stake;
}
if operators.is_empty() || total_stake.is_zero() {
warn!(
chain_id = self.chain_id,
sequence_no = next_seq,
operator_count = operators.len(),
total_stake = %total_stake,
"state-commit prepare: no operator stake to majority over"
);
newton_prover_metrics::inc_state_commit_disagreement_total(
self.chain_id,
newton_prover_metrics::state_commit_disagreement_flavor::EMPTY_SET,
);
return Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq });
}
let mut futs = FuturesUnordered::new();
for entry in operators {
let client = Arc::clone(&self.operator_client);
let op_id = entry.operator_id;
let seq = next_seq;
futs.push(async move { (op_id, client.get_state_commit_proposal(&op_id, seq).await) });
}
let mut tally: HashMap<MajorityProposal, U256> = HashMap::new();
let mut total = 0usize;
let mut success_count = 0usize;
while let Some((op_id, result)) = futs.next().await {
total += 1;
match result {
Ok(proposal) => {
success_count += 1;
let stake = op_stake.get(&op_id).copied().unwrap_or(U256::ZERO);
let key = MajorityProposal {
new_state_root: proposal.new_state_root,
da_cert_hash: proposal.da_cert_hash,
};
*tally.entry(key).or_insert(U256::ZERO) += stake;
}
Err(OperatorClientError::Transport { .. }) | Err(OperatorClientError::Timeout { .. }) => {
warn!(
chain_id = self.chain_id,
operator_id = %hex::encode(op_id),
"state-commit prepare: operator unreachable, skipping"
);
}
Err(e) => {
warn!(
chain_id = self.chain_id,
operator_id = %hex::encode(op_id),
error = %e,
"state-commit prepare: operator error, skipping"
);
}
}
}
let threshold_bps = self.aggregator.quorum_threshold_bps();
let matched_stake: U256 = tally.values().copied().fold(U256::ZERO, |acc, s| acc + s);
let error_count = total.saturating_sub(success_count);
let leader = tally.iter().max_by_key(|(_, stake)| *stake);
match leader {
Some((proposal, leader_stake))
if *leader_stake * U256::from(10_000u64) >= total_stake * U256::from(threshold_bps as u64) =>
{
debug!(
chain_id = self.chain_id,
sequence_no = next_seq,
leader_stake = %leader_stake,
matched_stake = %matched_stake,
total_stake = %total_stake,
threshold_bps,
total,
error_count,
new_state_root = %proposal.new_state_root,
"state-commit prepare: stake-weighted majority reached"
);
Ok(*proposal)
}
None => {
warn!(
chain_id = self.chain_id,
sequence_no = next_seq,
operator_count = operators.len(),
responded = total,
error_count,
total_stake = %total_stake,
threshold_bps,
"state-commit prepare: all operators unreachable or errored"
);
newton_prover_metrics::inc_state_commit_disagreement_total(
self.chain_id,
newton_prover_metrics::state_commit_disagreement_flavor::ALL_UNREACHABLE,
);
Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq })
}
Some(_) => {
warn!(
chain_id = self.chain_id,
sequence_no = next_seq,
operator_count = operators.len(),
responded = total,
error_count,
matched_stake = %matched_stake,
total_stake = %total_stake,
threshold_bps,
distinct_proposals = tally.len(),
"state-commit prepare: no stake-weighted majority among responses"
);
newton_prover_metrics::inc_state_commit_disagreement_total(
self.chain_id,
newton_prover_metrics::state_commit_disagreement_flavor::NO_MAJORITY,
);
Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq })
}
}
}
async fn commit_phase(
&self,
operators: &[OperatorEntry],
digest: alloy::primitives::B256,
commit: &newton_prover_core::state_commit_registry::IStateRootCommittable::StateCommit,
sequence_no: u64,
reference_timestamp: u32,
) -> Result<Vec<(OperatorId, eigensdk::crypto_bls::BlsG1Point)>, StateCommitError> {
let mut futs = FuturesUnordered::new();
for entry in operators {
let client = Arc::clone(&self.operator_client);
let op_id = entry.operator_id;
let commit = commit.clone();
futs.push(async move {
(
op_id,
client
.sign_state_commit(&op_id, digest, &commit, reference_timestamp)
.await,
)
});
}
let mut signatures = Vec::new();
while let Some((op_id, result)) = futs.next().await {
match result {
Ok(sig) => {
signatures.push((op_id, sig));
}
Err(OperatorClientError::DigestDisagreement { .. }) => {
warn!(
chain_id = self.chain_id,
sequence_no,
operator_id = %hex::encode(op_id),
"state-commit commit: operator refused to sign (digest disagreement)"
);
return Err(StateCommitError::OperatorDisagreement { sequence_no });
}
Err(OperatorClientError::Transport { .. }) | Err(OperatorClientError::Timeout { .. }) => {
warn!(
chain_id = self.chain_id,
sequence_no,
operator_id = %hex::encode(op_id),
"state-commit commit: operator unreachable, skipping"
);
}
Err(e) => {
warn!(
chain_id = self.chain_id,
sequence_no,
operator_id = %hex::encode(op_id),
error = %e,
"state-commit commit: operator error, skipping"
);
}
}
}
Ok(signatures)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state_commit::{
aggregator::StateCommitAggregator,
operator_client::{tests::FakeStateCommitOperatorClient, OperatorClientError, OperatorProposal},
operator_set_reader::StubOperatorSetSnapshotReader,
registry_view::RegistryView,
};
use alloy::primitives::{B256, U256};
use ark_bn254::{G1Affine, G2Affine};
use async_trait::async_trait;
use eigensdk::crypto_bls::{BlsG1Point, BlsG2Point};
use newton_prover_core::pcr0_provider::StubPcr0Provider;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
fn b32(byte: u8) -> B256 {
B256::repeat_byte(byte)
}
struct FakeRegistryReader {
view: RegistryView,
error: bool,
}
impl FakeRegistryReader {
fn ok(view: RegistryView) -> Arc<Self> {
Arc::new(Self { view, error: false })
}
fn failing() -> Arc<Self> {
Arc::new(Self {
view: RegistryView {
sequence_no: 0,
state_root: B256::ZERO,
last_commit_timestamp: 0,
},
error: true,
})
}
}
#[async_trait]
impl RegistryReader for FakeRegistryReader {
async fn read_view(&self) -> Result<RegistryView, StateCommitError> {
if self.error {
return Err(StateCommitError::RegistryNotConfigured);
}
Ok(self.view)
}
}
struct FakeWriter;
#[async_trait]
impl StateCommitWriter for FakeWriter {
async fn commit_state_root(
&self,
_commit: StateCommit,
_bls_certificate: Bytes,
) -> Result<(), StateCommitError> {
Ok(())
}
}
fn fake_writer() -> Arc<dyn StateCommitWriter> {
Arc::new(FakeWriter)
}
fn make_operator_entry(id_byte: u8) -> OperatorEntry {
let mut id = OperatorId::default();
id[0] = id_byte;
OperatorEntry {
operator_id: id,
stake: U256::from(1_000u64),
g1_pubkey: BlsG1Point::new(G1Affine::identity()),
g2_pubkey: BlsG2Point::new(G2Affine::identity()),
}
}
fn baseline_view() -> RegistryView {
RegistryView {
sequence_no: 5,
state_root: b32(0xab),
last_commit_timestamp: 1_000,
}
}
fn build_orchestrator(
reader: Arc<dyn RegistryReader>,
op_client: Arc<dyn StateCommitOperatorClient>,
operators: Vec<OperatorEntry>,
) -> Arc<StateCommitOrchestrator> {
Arc::new(StateCommitOrchestrator::new(
1,
Duration::from_secs(120),
reader,
Arc::new(StubPcr0Provider::new()),
op_client,
Arc::new(StateCommitAggregator::new(1, 6_700)),
fake_writer(),
Arc::new(StubOperatorSetSnapshotReader::new(operators, 42)),
crate::state_commit::operator_table::fake::FakeOperatorTableProvider::from_operators(42, Vec::new()),
newton_prover_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet {
avs: alloy::primitives::Address::ZERO,
id: 0,
},
))
}
#[tokio::test]
async fn tick_happy_path_reaches_aggregation() {
let view = baseline_view();
let client = Arc::new(FakeStateCommitOperatorClient::new());
let mut op1 = OperatorId::default();
op1[0] = 0x01;
let mut op2 = OperatorId::default();
op2[0] = 0x02;
let local_root = b32(0xcd);
let local_da = b32(0xef);
let local_pcr0 = *newton_prover_core::pcr0_sentinels::STATE_COMMIT_STUB_PCR0_HASH;
let proposal = OperatorProposal {
new_state_root: local_root,
da_cert_hash: local_da,
pcr0_commitment: local_pcr0,
};
client.set_proposal_for(&op1, proposal);
client.set_proposal_for(&op2, proposal);
let orch = build_orchestrator(
FakeRegistryReader::ok(view),
client,
vec![make_operator_entry(0x01), make_operator_entry(0x02)],
);
let result = orch.tick().await;
assert!(result.is_ok(), "expected Ok, got {result:?}");
}
#[tokio::test]
async fn tick_aborts_on_view_read_error() {
let client = Arc::new(FakeStateCommitOperatorClient::new());
let orch = build_orchestrator(FakeRegistryReader::failing(), client, vec![]);
let result = orch.tick().await;
assert!(matches!(result, Err(StateCommitError::RegistryNotConfigured)));
assert!(!StateCommitError::RegistryNotConfigured.is_poison());
}
#[tokio::test]
async fn tick_aborts_on_operator_disagreement_in_prepare() {
let local_root = b32(0xcd);
let local_da = b32(0xef);
let local_pcr0 = *newton_prover_core::pcr0_sentinels::STATE_COMMIT_STUB_PCR0_HASH;
let client = Arc::new(FakeStateCommitOperatorClient::new());
let mut op1 = OperatorId::default();
op1[0] = 0x01;
client.set_proposal_for(
&op1,
OperatorProposal {
new_state_root: local_root,
da_cert_hash: local_da,
pcr0_commitment: local_pcr0,
},
);
let mut op2 = OperatorId::default();
op2[0] = 0x02;
client.set_proposal_for(
&op2,
OperatorProposal {
new_state_root: b32(0xff),
da_cert_hash: local_da,
pcr0_commitment: local_pcr0,
},
);
let orch = build_orchestrator(
FakeRegistryReader::ok(baseline_view()),
client,
vec![make_operator_entry(0x01), make_operator_entry(0x02)],
);
let result = orch.tick().await;
assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
assert!(!result.unwrap_err().is_poison());
}
#[tokio::test]
async fn tick_aborts_when_operator_set_is_empty() {
let client = Arc::new(FakeStateCommitOperatorClient::new());
let orch = build_orchestrator(
FakeRegistryReader::ok(baseline_view()),
Arc::clone(&client) as _,
vec![],
);
let result = orch.tick().await;
assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
assert!(!result.unwrap_err().is_poison());
assert_eq!(client.proposal_call_count(), 0);
}
#[tokio::test]
async fn tick_aborts_when_all_operators_unreachable() {
let client = Arc::new(FakeStateCommitOperatorClient::new());
let mut op1 = OperatorId::default();
op1[0] = 0x01;
let mut op2 = OperatorId::default();
op2[0] = 0x02;
client.inject_error(
&op1,
OperatorClientError::Transport {
operator_id: hex::encode(op1),
source: "connection refused".into(),
},
);
client.inject_error(
&op2,
OperatorClientError::Timeout {
operator_id: hex::encode(op2),
timeout_ms: 5_000,
},
);
let orch = build_orchestrator(
FakeRegistryReader::ok(baseline_view()),
Arc::clone(&client) as _,
vec![make_operator_entry(0x01), make_operator_entry(0x02)],
);
let result = orch.tick().await;
assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
assert!(!result.unwrap_err().is_poison());
assert_eq!(client.proposal_call_count(), 2);
}
#[tokio::test]
async fn run_loop_exits_on_shutdown() {
let client = Arc::new(FakeStateCommitOperatorClient::new());
let orch = Arc::new(StateCommitOrchestrator::new(
1,
Duration::from_secs(3600), FakeRegistryReader::ok(baseline_view()),
Arc::new(StubPcr0Provider::new()),
client,
Arc::new(StateCommitAggregator::new(1, 6_700)),
fake_writer(),
Arc::new(StubOperatorSetSnapshotReader::new(vec![], 1)),
crate::state_commit::operator_table::fake::FakeOperatorTableProvider::from_operators(1, Vec::new()),
newton_prover_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet {
avs: alloy::primitives::Address::ZERO,
id: 0,
},
));
let token = CancellationToken::new();
let token_clone = token.clone();
let orch_clone = Arc::clone(&orch);
let handle = tokio::spawn(async move {
orch_clone.run(token_clone).await;
});
token.cancel();
tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("run() exited within 2s")
.expect("task did not panic");
}
}