use alloy::{
dyn_abi::SolType,
primitives::{keccak256, Address, FixedBytes, B256, U256},
sol,
sol_types::{SolStruct, SolValue},
};
use dashmap::DashMap;
use eigensdk::{
client_avsregistry::reader::AvsRegistryChainReader,
common::{get_provider, get_ws_provider},
crypto_bls::{convert_to_g1_point, convert_to_g2_point},
services_avsregistry::AvsRegistryService,
types::{avs::TaskResponseDigest, avs_state::OperatorAvsState, operator::OperatorId},
};
use eyre::WrapErr;
use newton_prover_chainio::{
avs::{
diagnostics, writer::AvsWriter, AvsRegistryServiceArcCaller, AvsRegistryServiceCaller,
AvsRegistryServiceChainCaller,
},
bls::{
update_aggregated_operators, BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService,
BlsServiceHandle, ServiceHandle, TaskMetadata, TaskSignature,
},
registry::OperatorRegistryService,
};
use newton_prover_core::{
bn254_certificate_verifier::{
IBN254CertificateVerifierTypes::{BN254Certificate, BN254OperatorInfoWitness},
IOperatorTableCalculatorTypes::BN254OperatorInfo,
ViewBN254CertificateVerifier,
BN254::{G1Point, G2Point},
},
common::{
compute_consensus_digest,
destination_verifier::{resolve_destination_verifier, ResolvedDestinationVerifier},
},
config::{contracts::SourceChainMultichainContracts, rpc::ChainRpcProviderConfig},
merkle::{self, OperatorInfo as MerkleOperatorInfo},
newton_prover_task_manager::{
INewtonProverTaskManager::TaskResponse as BindingTaskResponse, NewtonProverTaskManager,
},
operator_registry::OperatorRegistry,
TaskId,
};
use std::{
collections::{HashMap, HashSet},
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use crate::{
consensus::{build_consensus, ConsensusResult, DEFAULT_TOLERANCE_PCT},
error::AggregatorCoreError,
rpc_server::SignedTaskResponse,
};
sol! {
#[sol(rpc)]
interface IOperatorTableCalculatorView {
struct OperatorSet {
address avs;
uint32 id;
}
function getOperatorSetWeights(
OperatorSet calldata operatorSet
) external view returns (address[] memory operators, uint256[][] memory weights);
}
}
#[derive(Clone)]
pub struct TaskState {
quorum_nums: Vec<u8>,
operator_errors: Vec<crate::rpc_server::OperatorErrorResponse>,
expected_operators: usize,
pub task_responses: HashMap<TaskResponseDigest, BindingTaskResponse>,
quorum_locked: bool,
pub created_at: Instant,
attestation_data: alloy::primitives::Bytes,
}
impl std::fmt::Debug for TaskState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskState")
.field("quorum_nums", &self.quorum_nums)
.field("operator_errors", &self.operator_errors)
.field("expected_operators", &self.expected_operators)
.field("task_responses_count", &self.task_responses.len())
.field("quorum_locked", &self.quorum_locked)
.field("created_at", &self.created_at)
.field("attestation_data_len", &self.attestation_data.len())
.finish()
}
}
impl TaskState {
fn new(quorum_nums: Vec<u8>, expected_operators: usize) -> Self {
Self {
quorum_nums,
operator_errors: Vec::new(),
expected_operators,
task_responses: HashMap::new(),
quorum_locked: false,
created_at: Instant::now(),
attestation_data: alloy::primitives::Bytes::new(),
}
}
pub fn is_quorum_locked(&self) -> bool {
self.quorum_locked
}
pub fn set_quorum_locked(&mut self) {
self.quorum_locked = true;
}
}
fn calculate_certificate_digest(
reference_timestamp: u32,
message_hash: alloy::primitives::B256,
) -> alloy::primitives::B256 {
let cert = newton_prover_core::BN254Certificate {
referenceTimestamp: reference_timestamp,
messageHash: message_hash,
};
cert.eip712_hash_struct()
}
#[allow(missing_debug_implementations)]
pub struct AggregatorCore {
pub service_handle: Box<dyn BlsServiceHandle>,
#[allow(clippy::type_complexity)]
task_response_receivers:
DashMap<TaskId, UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>>,
pub task_states: Arc<DashMap<TaskId, TaskState>>,
cancellation_token: CancellationToken,
is_destination_chain: bool,
rpc: ChainRpcProviderConfig,
source_chain_id: u64,
dest_chain_id: u64,
task_manager_addr: Address,
operator_registry_address: Address,
avs_registry_service: AvsRegistryServiceArcCaller,
source_multichain_contracts: Option<SourceChainMultichainContracts>,
dest_operator_table_updater: Option<Address>,
}
impl Drop for AggregatorCore {
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
impl AggregatorCore {
#[allow(clippy::too_many_arguments)]
pub async fn new(
avs_registry_reader: AvsRegistryChainReader,
operator_registry_address: Address,
operator_state_retriever_address: Address,
is_destination_chain: bool,
rpc: ChainRpcProviderConfig,
source_chain_id: u64,
dest_chain_id: u64,
task_manager_addr: Address,
dest_operator_table_updater: Option<Address>,
) -> Result<Self, eyre::Error> {
let source_rpc = rpc.get_or_err(source_chain_id)?;
let operator_registry = OperatorRegistry::new(operator_registry_address, get_provider(&source_rpc.http));
let bls_apk_registry_address = operator_registry
.blsApkRegistry()
.call()
.await
.map_err(|e| eyre::eyre!("Failed to get BLS APK registry address: {}", e))?;
let socket_registry_address = operator_registry
.socketRegistry()
.call()
.await
.map_err(|e| eyre::eyre!("Failed to get socket registry address: {}", e))?;
let (operators_info_service, _error_rx) = OperatorRegistryService::new(
operator_registry_address,
socket_registry_address,
source_rpc.http.clone(),
source_rpc.ws.clone(),
)
.await
.map_err(|e| eyre::eyre!("Failed to initialize operator info service: {}", e))?;
match operators_info_service.load_operators_sync().await {
Ok(count) => {
info!("[AggregatorCore] pre-loaded {} operators for cache warming", count);
}
Err(e) => {
warn!(
"[AggregatorCore] failed to pre-load operator data: {} - cache will warm on first request",
e
);
}
}
let avs_registry_service =
AvsRegistryServiceChainCaller::new(avs_registry_reader, operators_info_service.clone());
let id_mappings = operators_info_service.get_operator_id_to_addr_mappings().await;
if !id_mappings.is_empty() {
info!(
"[AggregatorCore] warming operator ID cache with {} mappings",
id_mappings.len()
);
avs_registry_service.warm_operator_id_cache(id_mappings);
}
let token = CancellationToken::new();
let _provider = get_ws_provider(&source_rpc.ws)
.await
.map_err(|e| eyre::eyre!("Failed to get WS provider: {}", e))?;
tokio::spawn(async move {
let _ = operators_info_service.start_service(&token).await;
});
let (handle, _legacy_receiver) = {
let bls_service = BlsAggregatorService::new(avs_registry_service.clone());
bls_service.start()
};
let cancellation_token = CancellationToken::new();
let deployment_env = newton_prover_core::config::contracts::get_deployment_env();
let source_multichain_contracts = SourceChainMultichainContracts::try_load(source_chain_id, &deployment_env);
if is_destination_chain && source_multichain_contracts.is_none() {
return Err(eyre::eyre!(
"destination chain mode requires source multichain contracts; \
no deployment found for newton-cross-chain/{}-{}. \
Set DEPLOYMENT_DIR for custom deployments.",
source_chain_id,
deployment_env,
));
}
let core = Self {
service_handle: Box::new(handle),
task_response_receivers: DashMap::new(),
task_states: Arc::new(DashMap::new()),
cancellation_token: cancellation_token.clone(),
is_destination_chain,
rpc,
source_chain_id,
dest_chain_id,
task_manager_addr,
operator_registry_address,
avs_registry_service: AvsRegistryServiceArcCaller {
inner: Arc::new(avs_registry_service),
},
source_multichain_contracts,
dest_operator_table_updater,
};
Ok(core)
}
pub fn is_destination_chain(&self) -> bool {
self.is_destination_chain
}
#[instrument(skip(self))]
pub async fn initialize_task(
&self,
task_id: TaskId,
task_created_block: u64,
quorum_nums: Vec<u8>,
quorum_threshold_percentage: u8,
time_to_expiry: Duration,
broadcast_count: usize,
) -> Result<(), eyre::Error> {
if task_id == TaskId::ZERO {
return Err(eyre::eyre!("Invalid task_id: zero task ID"));
}
if quorum_nums.is_empty() {
return Err(eyre::eyre!("Invalid quorum_nums: empty quorum numbers"));
}
if quorum_threshold_percentage == 0 || quorum_threshold_percentage > 100 {
return Err(eyre::eyre!(
"Invalid quorum_threshold_percentage: must be between 1 and 100, got {}",
quorum_threshold_percentage
));
}
if time_to_expiry.as_secs() == 0 {
return Err(eyre::eyre!("Invalid time_to_expiry: zero duration"));
}
if let Some(state) = self.task_states.get(&task_id) {
if state.is_quorum_locked() {
debug!(
task_id = %task_id,
"[DEBUG] SKIP_INIT: Task already quorum-locked, skipping re-initialization"
);
return Ok(()); }
}
info!(
"Initializing task {} for BLS aggregation at block {}",
task_id, task_created_block
);
let task_metadata = TaskMetadata::new(
task_id,
task_created_block,
quorum_nums.clone(),
vec![quorum_threshold_percentage],
time_to_expiry,
);
self.task_states
.insert(task_id, TaskState::new(quorum_nums.clone(), broadcast_count));
let response_receiver = self
.service_handle
.initialize_task(task_metadata)
.await
.map_err(|e| eyre::eyre!("Failed to initialize task: {:?}", e))?;
self.task_response_receivers.insert(task_id, response_receiver);
let quorum_nums_hex = newton_prover_core::hex!(quorum_nums);
info!("Task {} initialized with quorums: {}", task_id, quorum_nums_hex);
Ok(())
}
#[instrument(skip(self, signed_response), fields(task_id = %signed_response.task_id, operator_id = %signed_response.operator_id()))]
pub async fn process_signed_response(&self, signed_response: SignedTaskResponse) -> Result<(), eyre::Error> {
let start_time = std::time::Instant::now();
let task_id = signed_response.task_id;
let operator_id = signed_response.operator_id();
if task_id == TaskId::ZERO {
return Err(eyre::eyre!("Invalid task_id: zero task ID"));
}
if operator_id.as_slice().iter().all(|&b| b == 0) {
return Err(eyre::eyre!("Invalid operator_id: zero operator ID"));
}
if let Some(state) = self.task_states.get(&task_id) {
if state.is_quorum_locked() {
debug!(
task_id = %task_id,
operator_id = %operator_id,
"[DEBUG] LATE_SIG_REJECTED: Task already quorum-locked, rejecting late signature"
);
return Ok(()); }
}
info!(
task_id = %task_id,
operator_id = %operator_id,
"Processing signed task response from operator"
);
let binding_task_response: BindingTaskResponse = signed_response.task_response.clone().into();
let message_hash = compute_consensus_digest(&binding_task_response);
let policy_data_hex = binding_task_response
.policyTaskData
.policyData
.first()
.map(|pd| newton_prover_core::hex!(&pd.data))
.unwrap_or_else(|| "no_policy_data".to_string());
debug!(
task_id = %task_id,
operator_id = %operator_id,
consensus_digest = %message_hash,
policy_client = %binding_task_response.policyClient,
policy_data = %policy_data_hex,
policy_data_count = binding_task_response.policyTaskData.policyData.len(),
evaluation_result = %newton_prover_core::hex!(&binding_task_response.evaluationResult),
"[DEBUG] BN254_OPERATOR_SIG: Operator signing TaskResponse with digest"
);
let task_response_digest = if self.is_destination_chain {
let reference_timestamp = self
.get_reference_timestamp(&signed_response)
.await
.map_err(|e| eyre::eyre!("Failed to get reference timestamp: {}", e))?;
calculate_certificate_digest(reference_timestamp, message_hash)
} else {
message_hash
};
let task_signature =
TaskSignature::new(task_id, task_response_digest, signed_response.signature(), operator_id);
let result = self.service_handle.process_signature(task_signature).await;
match result {
Ok(_) => {
let processing_duration = start_time.elapsed();
if let Some(mut state) = self.task_states.get_mut(&task_id) {
if state.is_quorum_locked() {
info!(
task_id = %task_id,
operator_id = %operator_id,
duration_ms = processing_duration.as_millis(),
"Task already quorum-locked, skipping response storage"
);
return Ok(());
}
if state.task_responses.is_empty() || state.task_responses.contains_key(&task_response_digest) {
state.task_responses.insert(task_response_digest, binding_task_response);
} else {
debug!(
task_id = %task_id,
operator_id = %operator_id,
"Skipping response with divergent digest — first digest wins"
);
}
}
info!(
task_id = %task_id,
operator_id = %operator_id,
duration_ms = processing_duration.as_millis(),
"Successfully processed signed response"
);
Ok(())
}
Err(BlsAggregationServiceError::TaskExpired {
task_id: err_task_id,
reason,
}) => {
let processing_duration = start_time.elapsed();
match reason {
newton_prover_chainio::bls::TaskExpiryReason::QuorumMet => {
info!(
task_id = %err_task_id,
operator_id = %operator_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"Task already finished (quorum met) - gracefully ignoring late response"
);
Ok(())
}
newton_prover_chainio::bls::TaskExpiryReason::QuorumNotMet
| newton_prover_chainio::bls::TaskExpiryReason::WindowOpenNoResponse
| newton_prover_chainio::bls::TaskExpiryReason::WindowFinishedNoResponse => {
error!(
task_id = %err_task_id,
operator_id = %operator_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"Task expired without reaching quorum threshold"
);
Err(eyre::eyre!(
"Task {} expired without reaching quorum threshold (processed in {} ms): {}",
err_task_id,
processing_duration.as_millis(),
reason
))
}
}
}
Err(BlsAggregationServiceError::ReceiverError {
operation: _,
task_id: err_task_id,
operator_context: _,
reason,
}) => {
let processing_duration = start_time.elapsed();
match reason {
newton_prover_chainio::bls::ReceiverErrorReason::OneshotReceiverClosed => {
let task_finished = !self.has_task_receiver(err_task_id).await;
if task_finished {
info!(
task_id = %err_task_id,
operator_id = %operator_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"Task already finished (quorum met) - gracefully ignoring late response (receiver closed)"
);
Ok(())
} else {
error!(
task_id = %err_task_id,
operator_id = %operator_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"Receiver error while processing signature (task may have expired unexpectedly)"
);
Err(eyre::eyre!(
"Receiver error while processing signature from operator {} for task {} (processed in {} ms): {}",
operator_id,
err_task_id,
processing_duration.as_millis(),
reason
))
}
}
newton_prover_chainio::bls::ReceiverErrorReason::AggregateChannelClosed => {
error!(
task_id = %err_task_id,
operator_id = %operator_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"Aggregate response channel closed"
);
Err(eyre::eyre!(
"Aggregate response channel closed for task {} (processed in {} ms): {}",
err_task_id,
processing_duration.as_millis(),
reason
))
}
}
}
Err(BlsAggregationServiceError::TaskNotFound {
task_id: err_task_id,
reason,
}) => {
let processing_duration = start_time.elapsed();
error!(
task_id = %err_task_id,
reason = %reason,
duration_ms = processing_duration.as_millis(),
"TaskNotFound: task not initialized yet, rejecting signature"
);
Err(eyre::eyre!(
"Task {} not initialized yet, cannot process signature from operator {} (processed in {} ms): {}",
err_task_id,
operator_id,
processing_duration.as_millis(),
reason
))
}
Err(e) => {
let processing_duration = start_time.elapsed();
error!(
task_id = %task_id,
operator_id = %operator_id,
?e,
duration_ms = processing_duration.as_millis(),
"BLS aggregation service error while processing signature"
);
Err(eyre::eyre!(
"Failed to process signature for task {} from operator {} after {} ms: {:?}",
task_id,
operator_id,
processing_duration.as_millis(),
e
))
}
}
}
#[instrument(skip(self, error_response), fields(task_id = %error_response.task_id, operator_id = %error_response.operator_id))]
pub async fn process_operator_error(
&self,
error_response: crate::rpc_server::OperatorErrorResponse,
) -> Result<(), eyre::Error> {
info!(
task_id = %error_response.task_id,
operator_id = %error_response.operator_id,
error_code = error_response.error_code,
"Processing operator error response"
);
if let Some(mut state) = self.task_states.get_mut(&error_response.task_id) {
state.operator_errors.push(error_response);
} else {
warn!(
task_id = %error_response.task_id,
"Task state not found when recording operator error - task may not be initialized"
);
}
Ok(())
}
pub async fn majority_operators_errored(
&self,
task_id: TaskId,
) -> Option<Vec<crate::rpc_server::OperatorErrorResponse>> {
let state = self.task_states.get(&task_id)?;
let majority_threshold = (state.expected_operators / 2) + 1;
if state.operator_errors.len() >= majority_threshold {
Some(state.operator_errors.clone())
} else {
None
}
}
pub async fn get_operator_errors(&self, task_id: TaskId) -> Option<Vec<crate::rpc_server::OperatorErrorResponse>> {
self.task_states
.get(&task_id)
.map(|state| state.operator_errors.clone())
}
pub fn get_task_response(
&self,
task_id: TaskId,
task_response_digest: TaskResponseDigest,
) -> Option<BindingTaskResponse> {
self.task_states
.get(&task_id)
.and_then(|state| state.task_responses.get(&task_response_digest).cloned())
}
pub async fn has_task_receiver(&self, task_id: TaskId) -> bool {
self.task_response_receivers.contains_key(&task_id)
}
pub async fn wait_for_aggregation(
&self,
task_id: TaskId,
timeout_duration: Duration,
) -> Result<BlsAggregationServiceResponse, AggregatorCoreError> {
self.wait_for_aggregation_with_cancellation(task_id, timeout_duration, None)
.await
}
pub async fn wait_for_aggregation_with_cancellation(
&self,
task_id: TaskId,
timeout_duration: Duration,
cancellation_token: Option<CancellationToken>,
) -> Result<BlsAggregationServiceResponse, AggregatorCoreError> {
if timeout_duration.is_zero() {
return Err(AggregatorCoreError::InvalidTimeoutDuration("zero duration".to_string()));
}
if task_id == TaskId::ZERO {
return Err(AggregatorCoreError::TaskNotInitialized { task_id });
}
info!(
task_id = %task_id,
"wait_for_aggregation: timeout_duration {} ms",
timeout_duration.as_millis()
);
let receiver = self.task_response_receivers.remove(&task_id).map(|(_, v)| v);
let mut receiver = receiver.ok_or(AggregatorCoreError::TaskNotInitialized { task_id })?;
let start_time = std::time::Instant::now();
let recv_future = receiver.recv();
let result = if let Some(token) = cancellation_token {
tokio::select! {
result = tokio::time::timeout(timeout_duration, recv_future) => result,
_ = token.cancelled() => {
if self.task_states.remove(&task_id).is_some() {
debug!(task_id = %task_id, "cleaned up task state after cancellation");
}
return Err(AggregatorCoreError::Cancelled);
}
}
} else {
tokio::time::timeout(timeout_duration, recv_future).await
};
match result {
Ok(Some(Ok(response))) => {
let duration = start_time.elapsed();
info!(
task_id = %response.task_id,
duration_ms = duration.as_millis(),
timeout_ms = timeout_duration.as_millis(),
"Successfully received aggregated response"
);
if let Some(mut state) = self.task_states.get_mut(&task_id) {
state.set_quorum_locked();
debug!(
task_id = %task_id,
"[DEBUG] QUORUM_LOCKED: Task marked as quorum-locked, late signatures will be rejected"
);
}
Ok(response)
}
Ok(Some(Err(e))) => {
let duration = start_time.elapsed();
error!(
task_id = %task_id,
?e,
duration_ms = duration.as_millis(),
timeout_ms = timeout_duration.as_millis(),
"Failed to receive aggregation response"
);
if self.task_states.remove(&task_id).is_some() {
debug!(task_id = %task_id, "cleaned up task state after aggregation error");
}
Err(AggregatorCoreError::AggregationServiceError(e))
}
Ok(None) => {
let duration = start_time.elapsed();
warn!(
task_id = %task_id,
duration_ms = duration.as_millis(),
"Response channel closed for task"
);
if self.task_states.remove(&task_id).is_some() {
debug!(task_id = %task_id, "cleaned up task state after channel closed");
}
Err(AggregatorCoreError::AggregationServiceError(
BlsAggregationServiceError::ReceiverError {
operation: "ReceiveAggregatedResponse".to_string(),
task_id,
operator_context: String::new(),
reason: newton_prover_chainio::bls::ReceiverErrorReason::AggregateChannelClosed,
},
))
}
Err(_) => {
let duration = start_time.elapsed();
let duration_ms = duration.as_millis();
let timeout_ms = timeout_duration.as_millis();
let task_responses_count: usize = self
.task_states
.iter()
.map(|entry| entry.value().task_responses.len())
.sum();
let is_expected_timeout = task_responses_count == 0;
if is_expected_timeout {
debug!(
task_id = %task_id,
duration_ms,
timeout_ms,
"Timeout expected - no pending aggregations"
);
} else {
warn!(
task_id = %task_id,
duration_ms,
timeout_ms,
task_responses_count,
"Aggregation timeout after {} ms (timeout: {} ms) - {} pending task responses",
duration_ms,
timeout_ms,
task_responses_count
);
}
if self.task_states.remove(&task_id).is_some() {
debug!(task_id = %task_id, "cleaned up task state after timeout");
}
let operator_errors = self.get_operator_errors(task_id).await;
Err(AggregatorCoreError::Timeout {
duration_ms,
timeout_ms,
operator_errors,
})
}
}
}
#[instrument(skip(self, avs_writer, task, task_response, service_response))]
pub async fn submit_aggregated_response(
&self,
avs_writer: &AvsWriter,
task: newton_prover_core::r#newton_prover_task_manager::INewtonProverTaskManager::Task,
task_response: BindingTaskResponse,
service_response: BlsAggregationServiceResponse,
) -> Result<alloy::rpc::types::TransactionReceipt, eyre::Error> {
let task_id = service_response.task_id;
let start_time = std::time::Instant::now();
info!(
task_id = %task_id,
signers_count = service_response.signers_count,
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
task_response_digest = %service_response.task_response_digest,
"Submitting aggregated response for task"
);
let conversion_start = std::time::Instant::now();
let signature_data = if self.is_destination_chain {
info!(task_id = %task_id, "Creating certificate signature data for destination chain");
self.create_certificate_signature_data(avs_writer, &task, &task_response, &service_response)
.await
.map_err(|e| {
let conversion_duration = conversion_start.elapsed();
error!(
task_id = %task_id,
?e,
duration_ms = conversion_duration.as_millis(),
"Failed to create certificate signature data"
);
eyre::eyre!(
"Failed to create certificate signature data for task {} after {} ms: {}",
task_id,
conversion_duration.as_millis(),
e
)
})?
} else {
info!(task_id = %task_id, "Creating standard signature data");
self.create_signature_data(&service_response).map_err(|e| {
let conversion_duration = conversion_start.elapsed();
error!(
task_id = %task_id,
?e,
duration_ms = conversion_duration.as_millis(),
"Failed to convert BLS response to contract format"
);
eyre::eyre!(
"Failed to convert BLS response to contract format for task {} after {} ms: {}",
task_id,
conversion_duration.as_millis(),
e
)
})?
};
let conversion_duration = conversion_start.elapsed();
debug!(
task_id = %task_id,
duration_ms = conversion_duration.as_millis(),
is_certificate = self.is_destination_chain,
"Successfully created signature data"
);
let task_response_consensus_digest = newton_prover_core::common::compute_consensus_digest(&task_response);
debug!(
task_id = %task_id,
service_response_digest = %service_response.task_response_digest,
task_response_consensus_digest = %task_response_consensus_digest,
digests_match = %(service_response.task_response_digest == task_response_consensus_digest),
signers_count = service_response.signers_count,
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
task_created_block = service_response.task_created_block,
"[DEBUG] About to submit aggregated response - digest comparison"
);
debug!(
task_id = %task_id,
quorum_apk_indices = ?service_response.quorum_apk_indices,
total_stake_indices = ?service_response.total_stake_indices,
non_signer_quorum_bitmap_indices = ?service_response.non_signer_quorum_bitmap_indices,
quorum_apks_g1_count = service_response.quorum_apks_g1.len(),
signers_apk_g2_x = %format!("{:?}", service_response.signers_apk_g2.g2().x),
signers_apk_g2_y = %format!("{:?}", service_response.signers_apk_g2.g2().y),
"[DEBUG] BLS signature data for contract submission"
);
let policy_data_hex = task_response
.policyTaskData
.policyData
.first()
.map(|pd| newton_prover_core::hex!(&pd.data))
.unwrap_or_else(|| "no_policy_data".to_string());
let evaluation_result_bytes = newton_prover_core::hex!(&task_response.evaluationResult);
debug!(
task_id = %task_id,
policy_client = %task_response.policyClient,
policy_id = %newton_prover_core::hex!(task_response.policyId),
policy_address = %task_response.policyAddress,
policy_data = %policy_data_hex,
policy_data_count = task_response.policyTaskData.policyData.len(),
evaluation_result = %evaluation_result_bytes,
evaluation_result_len = task_response.evaluationResult.len(),
initialization_timestamp = task_response.initializationTimestamp.saturating_to::<u64>(),
"[DEBUG] BN254_SUBMIT: TaskResponse key fields being submitted"
);
let sigma_g1 = service_response.signers_agg_sig_g1.g1_point().g1();
debug!(
task_id = %task_id,
sigma_x = %format!("{:?}", sigma_g1.x),
sigma_y = %format!("{:?}", sigma_g1.y),
sigma_is_infinity = sigma_g1.infinity,
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
non_signers_operator_ids = ?service_response.non_signers_operators_ids.iter().map(|id| newton_prover_core::hex!(id)).collect::<Vec<_>>(),
"[DEBUG] BN254_SUBMIT: Sigma (aggregated signature) and non-signers for pairing check"
);
for (i, apk_g1) in service_response.quorum_apks_g1.iter().enumerate() {
debug!(
task_id = %task_id,
quorum_idx = i,
apk_g1_x = %format!("{:?}", apk_g1.g1().x),
apk_g1_y = %format!("{:?}", apk_g1.g1().y),
"[DEBUG] BN254_SUBMIT: Quorum APK G1 point"
);
}
let submission_start = std::time::Instant::now();
let result = avs_writer
.send_aggregated_response(
task,
task_response.clone(),
signature_data,
self.get_task_attestation_data(&task_id),
self.dest_chain_id,
)
.await;
match result {
Ok(receipt) => {
let submission_duration = submission_start.elapsed();
let total_duration = start_time.elapsed();
newton_prover_metrics::inc_tasks_aggregator_respond_success();
newton_prover_metrics::record_task_aggregation_duration(total_duration.as_secs_f64());
let tx_hash_hex = newton_prover_core::hex!(receipt.transaction_hash);
info!(
task_id = %task_id,
tx_hash = %tx_hash_hex,
submission_duration_ms = submission_duration.as_millis(),
total_duration_ms = total_duration.as_millis(),
"Successfully submitted aggregated response"
);
if self.task_states.remove(&task_id).is_some() {
info!(
task_id = %task_id,
"Cleaned up task state after successful submission"
);
}
Ok(receipt)
}
Err(e) => {
let submission_duration = submission_start.elapsed();
let total_duration = start_time.elapsed();
newton_prover_metrics::inc_tasks_aggregator_respond_failure();
let error_str = format!("{:?}", e);
let is_bn254_failure = error_str.contains("BN254") || error_str.contains("pairing");
error!(
task_id = %task_id,
?e,
submission_duration_ms = submission_duration.as_millis(),
total_duration_ms = total_duration.as_millis(),
is_bn254_failure = is_bn254_failure,
service_response_digest = %service_response.task_response_digest,
signers_count = service_response.signers_count,
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
task_created_block = service_response.task_created_block,
"[DEBUG] BN254_FAILURE: Failed to submit aggregated response to contract"
);
if is_bn254_failure {
let policy_data_hex = task_response
.policyTaskData
.policyData
.first()
.map(|pd| newton_prover_core::hex!(&pd.data))
.unwrap_or_else(|| "no_policy_data".to_string());
let sigma_g1_fail = service_response.signers_agg_sig_g1.g1_point().g1();
error!(
task_id = %task_id,
policy_client = %task_response.policyClient,
policy_data = %policy_data_hex,
evaluation_result = %newton_prover_core::hex!(&task_response.evaluationResult),
sigma_x = %format!("{:?}", sigma_g1_fail.x),
sigma_y = %format!("{:?}", sigma_g1_fail.y),
sigma_is_infinity = sigma_g1_fail.infinity,
"[DEBUG] BN254_FAILURE: Data that was being submitted when pairing failed"
);
if tracing::enabled!(tracing::Level::DEBUG) {
if let Ok(source_rpc) = self.rpc.get_or_err(self.source_chain_id) {
let operator_mappings = self.avs_registry_service.get_all_operator_addresses();
debug!(
task_id = %task_id,
operator_count = operator_mappings.len(),
"[DEBUG] BN254_FAILURE: Running G2 key verification diagnostic"
);
if let Ok(operator_states) = self
.avs_registry_service
.get_operators_avs_state_at_block(
service_response.task_created_block,
&[0], )
.await
{
let mut operators_with_keys = Vec::new();
for (op_id, op_state) in &operator_states {
if let Some(pub_keys) = &op_state.operator_info.pub_keys {
if let Some(addr) = self.avs_registry_service.get_operator_address(op_id) {
operators_with_keys.push((addr, pub_keys.clone()));
}
}
}
if !operators_with_keys.is_empty() {
match diagnostics::verify_signer_g2_keys_by_address(
self.operator_registry_address,
&source_rpc.http,
operators_with_keys,
)
.await
{
Ok((all_match, mismatch_count)) => {
if all_match {
debug!(
task_id = %task_id,
"[DEBUG] BN254_FAILURE: All G2 keys MATCH on-chain values - issue is elsewhere"
);
} else {
error!(
task_id = %task_id,
mismatch_count = mismatch_count,
"[DEBUG] BN254_FAILURE: G2 KEY MISMATCH DETECTED - {} operators have stale G2 keys",
mismatch_count
);
}
}
Err(e) => {
warn!(
task_id = %task_id,
error = %e,
"[DEBUG] BN254_FAILURE: G2 verification diagnostic failed"
);
}
}
} else {
debug!(
task_id = %task_id,
"[DEBUG] BN254_FAILURE: No operators with cached pub keys found for G2 verification"
);
}
} else {
debug!(
task_id = %task_id,
"[DEBUG] BN254_FAILURE: Could not fetch operator states for G2 verification"
);
}
}
}
}
if self.task_states.remove(&task_id).is_some() {
info!(
task_id = %task_id,
"Cleaned up task state after failed submission"
);
}
Err(eyre::eyre!(
"Failed to submit aggregated response for task {} after {} ms (submission took {} ms): {:?}",
task_id,
total_duration.as_millis(),
submission_duration.as_millis(),
e
))
}
}
}
pub fn create_signature_data(
&self,
service_response: &BlsAggregationServiceResponse,
) -> Result<alloy::primitives::Bytes, eyre::Error> {
let task_id_hex = newton_prover_core::hex!(service_response.task_id);
debug!(
task_id = %task_id_hex,
signers_count = service_response.signers_count,
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
task_created_block = service_response.task_created_block,
task_response_digest = %service_response.task_response_digest,
"creating standard signature data for contract submission - signers={}, non-signers={}, block={}",
service_response.signers_count,
service_response.non_signers_pub_keys_g1.len(),
service_response.task_created_block
);
let non_signer_stakes_and_signature =
newton_prover_chainio::bls::convert_bls_response_to_contract_format(service_response)
.map_err(|e| eyre::eyre!("Failed to convert BLS response to contract format: {}", e))?;
let encoded_bytes = non_signer_stakes_and_signature.abi_encode();
debug!(
task_id = %task_id_hex,
encoded_length = encoded_bytes.len(),
"signature data encoded successfully"
);
Ok(alloy::primitives::Bytes::from(encoded_bytes))
}
pub async fn create_certificate_signature_data(
&self,
avs_writer: &AvsWriter,
task: &newton_prover_core::r#newton_prover_task_manager::INewtonProverTaskManager::Task,
task_response: &BindingTaskResponse,
service_response: &BlsAggregationServiceResponse,
) -> Result<alloy::primitives::Bytes, eyre::Error> {
let message_hash = compute_consensus_digest(task_response);
debug!(
task_id = %newton_prover_core::hex!(task_response.taskId),
certificate_message_hash = %message_hash,
service_response_digest = %service_response.task_response_digest,
message_hash_matches_service_response = %(message_hash == service_response.task_response_digest),
non_signers_count = service_response.non_signers_pub_keys_g1.len(),
"creating certificate signature data"
);
let provider = get_provider(&avs_writer.rpc_url);
let resolved = resolve_destination_verifier(avs_writer.task_manager_addr(), provider.clone()).await?;
let certificate_verifier_addr = resolved.certificate_verifier;
let source_chain_avs = resolved.source_chain_avs;
let operator_set_id = task.quorumNumbers.first().copied().unwrap_or(0) as u32;
let operator_set = ViewBN254CertificateVerifier::OperatorSet {
avs: source_chain_avs,
id: operator_set_id,
};
let certificate_verifier = ViewBN254CertificateVerifier::ViewBN254CertificateVerifierInstance::new(
certificate_verifier_addr,
provider.clone(),
);
let reference_timestamp: u32 = certificate_verifier
.latestReferenceTimestamp(operator_set)
.call()
.await
.map_err(|e| eyre::eyre!("Failed to get latest reference timestamp: {:?}", e))?;
let reference_block_number: Option<u64> = if let Some(updater_addr) = self.dest_operator_table_updater {
use newton_prover_core::ecdsa_operator_table_updater::ECDSAOperatorTableUpdater;
let updater = ECDSAOperatorTableUpdater::new(updater_addr, provider.clone());
match updater
.getReferenceBlockNumberByTimestamp(reference_timestamp)
.call()
.await
{
Ok(block_num) if block_num > 0 => {
debug!(
task_id = %newton_prover_core::hex!(task_response.taskId),
reference_timestamp,
reference_block_number = block_num,
"resolved historical block for certificate generation"
);
Some(block_num as u64)
}
Ok(_) => {
warn!(
task_id = %newton_prover_core::hex!(task_response.taskId),
reference_timestamp,
"reference block number is 0 for timestamp, falling back to current block"
);
None
}
Err(e) => {
warn!(
task_id = %newton_prover_core::hex!(task_response.taskId),
reference_timestamp,
error = %e,
"failed to resolve reference block number, falling back to current block"
);
None
}
}
} else {
None
};
let sig = convert_to_g1_point(service_response.signers_agg_sig_g1.g1_point().g1())?;
let signature_g1 = G1Point { X: sig.X, Y: sig.Y };
let apk = convert_to_g2_point(service_response.signers_apk_g2.g2())?;
let apk_g2 = G2Point { X: apk.X, Y: apk.Y };
let non_signer_witnesses = self
.build_non_signer_witnesses(
service_response,
task.quorumNumbers.as_ref(),
service_response.task_created_block,
source_chain_avs,
operator_set_id,
reference_block_number,
)
.await?;
debug!(
task_id = %newton_prover_core::hex!(task_response.taskId),
non_signer_witnesses_count = non_signer_witnesses.len(),
"built non-signer witnesses for certificate"
);
let certificate = BN254Certificate {
referenceTimestamp: reference_timestamp,
messageHash: message_hash,
signature: signature_g1,
apk: apk_g2,
nonSignerWitnesses: non_signer_witnesses,
};
Ok(alloy::primitives::Bytes::from(certificate.abi_encode()))
}
async fn build_non_signer_witnesses(
&self,
service_response: &BlsAggregationServiceResponse,
quorum_numbers: &[u8],
task_created_block: u64,
source_chain_avs: Address,
operator_set_id: u32,
reference_block_number: Option<u64>,
) -> Result<Vec<BN254OperatorInfoWitness>, eyre::Error> {
if service_response.non_signers_pub_keys_g1.is_empty() {
debug!("No non-signers, returning empty witnesses");
return Ok(vec![]);
}
let operator_states = self
.avs_registry_service
.get_operators_avs_state_at_block(task_created_block, quorum_numbers)
.await
.map_err(|e| eyre::eyre!("Failed to get operator AVS states for merkle tree: {}", e))?;
if operator_states.is_empty() {
return Err(eyre::eyre!(
"No operator states found at block {} for quorums {:?}",
task_created_block,
quorum_numbers
));
}
info!(
"Building merkle tree from {} operators for {} non-signers",
operator_states.len(),
service_response.non_signers_pub_keys_g1.len()
);
let mut merkle_operators: Vec<(FixedBytes<32>, MerkleOperatorInfo)> = Vec::new();
if self.is_destination_chain {
if let Some(multichain_contracts) = &self.source_multichain_contracts {
let source_rpc = self.rpc.get_or_err(self.source_chain_id)?;
let provider = get_provider(&source_rpc.http);
let operator_table_calculator =
IOperatorTableCalculatorView::new(multichain_contracts.table_calculator, provider);
let operator_set = IOperatorTableCalculatorView::OperatorSet {
avs: source_chain_avs,
id: operator_set_id,
};
let result = if let Some(ref_block) = reference_block_number {
debug!(
reference_block_number = ref_block,
"querying operator set weights at historical block (matches verifier state)"
);
operator_table_calculator
.getOperatorSetWeights(operator_set)
.call()
.block(ref_block.into())
.await
.wrap_err_with(|| format!(
"failed to fetch operator set weights at historical block {} from table calculator (RPC may not serve this block depth — consider reducing transporter update_frequency or using archive RPC)",
ref_block
))?
} else {
operator_table_calculator
.getOperatorSetWeights(operator_set)
.call()
.await
.wrap_err("failed to fetch operator set weights from table calculator")?
};
let mut addr_to_id: HashMap<Address, FixedBytes<32>> = HashMap::new();
for operator_id in operator_states.keys() {
if let Some(addr) = self.avs_registry_service.get_operator_address(operator_id) {
addr_to_id.insert(addr, *operator_id);
}
}
let table_operator_count = result.operators.len();
for (operator_addr, weights) in result.operators.into_iter().zip(result.weights) {
let operator_id = match addr_to_id.get(&operator_addr) {
Some(id) => id,
None => {
warn!(
operator_addr = %operator_addr,
table_operators = table_operator_count,
registry_operators = addr_to_id.len(),
"operator in table calculator but not in registry state, skipping merkle entry"
);
continue;
}
};
let state = match operator_states.get(operator_id) {
Some(s) => s,
None => {
warn!(
operator_id = %operator_id,
"operator in addr_to_id but missing state, skipping merkle entry"
);
continue;
}
};
let pub_keys = state
.operator_info
.pub_keys
.as_ref()
.ok_or_else(|| eyre::eyre!("operator {} missing public keys", operator_id))?;
let g1_point = convert_to_g1_point(pub_keys.g1_pub_key.g1())
.wrap_err_with(|| format!("failed to convert G1 point for operator {}", operator_id))?;
let merkle_op = MerkleOperatorInfo::new(g1_point.X, g1_point.Y, weights);
merkle_operators.push((*operator_id, merkle_op));
}
} else {
return Err(eyre::eyre!(
"destination chain requires source multichain contracts for witness weight generation"
));
}
} else {
for (operator_id, state) in &operator_states {
let pub_keys = state
.operator_info
.pub_keys
.as_ref()
.ok_or_else(|| eyre::eyre!("operator {} missing public keys", operator_id))?;
let g1_point = convert_to_g1_point(pub_keys.g1_pub_key.g1())
.wrap_err_with(|| format!("failed to convert G1 point for operator {}", operator_id))?;
let mut sorted_quorums: Vec<_> = state.stake_per_quorum.iter().collect();
sorted_quorums.sort_by_key(|(q, _)| *q);
let mut weights: Vec<U256> = sorted_quorums.into_iter().map(|(_, stake)| *stake).collect();
if weights.is_empty() {
weights.push(U256::from(1_000_000_000_000_000_000u128));
}
let merkle_op = MerkleOperatorInfo::new(g1_point.X, g1_point.Y, weights);
merkle_operators.push((*operator_id, merkle_op));
}
}
let operator_infos: Vec<MerkleOperatorInfo> = merkle_operators.iter().map(|(_, op)| op.clone()).collect();
let mut witnesses = Vec::new();
let mut seen_indices = HashSet::new();
for (ns_idx, non_signer_pk) in service_response.non_signers_pub_keys_g1.iter().enumerate() {
let ns_g1_point = convert_to_g1_point(non_signer_pk.g1())
.map_err(|e| eyre::eyre!("Failed to convert non-signer G1 point: {}", e))?;
let operator_index = merkle_operators
.iter()
.position(|(_, op)| op.pubkey_x == ns_g1_point.X && op.pubkey_y == ns_g1_point.Y)
.ok_or_else(|| {
eyre::eyre!(
"Non-signer {} with pubkey ({}, {}) not found in operator states",
ns_idx,
ns_g1_point.X,
ns_g1_point.Y
)
})?;
if !seen_indices.insert(operator_index as u32) {
return Err(eyre::eyre!(
"duplicate operator index {} for non-signer with pubkey ({}, {})",
operator_index,
ns_g1_point.X,
ns_g1_point.Y
));
}
let merkle_proof = merkle::generate_merkle_proof(&operator_infos, operator_index).ok_or_else(|| {
eyre::eyre!(
"Failed to generate merkle proof for non-signer at operator index {}",
operator_index
)
})?;
let proof_bytes: Vec<u8> = merkle_proof.proof.iter().flat_map(|p| p.as_slice().to_vec()).collect();
let non_signer_op = &merkle_operators[operator_index].1;
let operator_info = BN254OperatorInfo {
pubkey: G1Point {
X: non_signer_op.pubkey_x,
Y: non_signer_op.pubkey_y,
},
weights: non_signer_op.weights.clone(),
};
let witness = BN254OperatorInfoWitness {
operatorIndex: operator_index as u32,
operatorInfoProof: alloy::primitives::Bytes::from(proof_bytes),
operatorInfo: operator_info,
};
debug!(
"Built witness for non-signer {}: index={}, proof_len={}",
ns_idx,
operator_index,
witness.operatorInfoProof.len()
);
witnesses.push(witness);
}
witnesses.sort_by_key(|w| w.operatorIndex);
if let Some(window) = witnesses
.windows(2)
.find(|pair| pair[0].operatorIndex >= pair[1].operatorIndex)
{
return Err(eyre::eyre!(
"non-signer witness indices not strictly increasing: {} then {}",
window[0].operatorIndex,
window[1].operatorIndex
));
}
Ok(witnesses)
}
async fn get_reference_timestamp(&self, signed_response: &SignedTaskResponse) -> Result<u32, eyre::Error> {
let dest_rpc = self.rpc.get_or_err(self.dest_chain_id)?;
let provider = get_provider(&dest_rpc.http);
let resolved = resolve_destination_verifier(self.task_manager_addr, provider.clone()).await?;
let certificate_verifier_addr = resolved.certificate_verifier;
let source_chain_avs = resolved.source_chain_avs;
let quorum_nums = self
.task_states
.get(&signed_response.task_id)
.map(|state| state.quorum_nums.clone())
.unwrap_or_else(|| vec![0]);
let operator_set_id = quorum_nums.first().copied().unwrap_or(0) as u32;
let operator_set = ViewBN254CertificateVerifier::OperatorSet {
avs: source_chain_avs,
id: operator_set_id,
};
let certificate_verifier = ViewBN254CertificateVerifier::ViewBN254CertificateVerifierInstance::new(
certificate_verifier_addr,
provider,
);
let reference_timestamp = certificate_verifier
.latestReferenceTimestamp(operator_set)
.call()
.await
.map_err(|e| eyre::eyre!("failed to get latest reference timestamp: {:?}", e))?;
Ok(reference_timestamp)
}
#[instrument(skip(self, signed_responses), fields(
task_id = %task_id,
response_count = signed_responses.len(),
quorum_count = quorum_numbers.len()
))]
pub async fn aggregate_responses(
&self,
task_id: TaskId,
task_created_block: u64,
signed_responses: &[(OperatorId, SignedTaskResponse)],
quorum_numbers: &[u8],
) -> Result<BlsAggregationServiceResponse, eyre::Error> {
if signed_responses.is_empty() {
return Err(eyre::eyre!("No signed responses provided for aggregation"));
}
info!(
"Aggregating {} signed responses for task {}",
signed_responses.len(),
task_id
);
let mut responses_by_digest: HashMap<FixedBytes<32>, Vec<(OperatorId, SignedTaskResponse, FixedBytes<32>)>> =
HashMap::new();
for (operator_id, signed_response) in signed_responses {
let binding_task_response: BindingTaskResponse = signed_response.task_response.clone().into();
let task_response_digest = compute_consensus_digest(&binding_task_response);
responses_by_digest.entry(task_response_digest).or_default().push((
*operator_id,
signed_response.clone(),
task_response_digest,
));
}
let (task_response_digest, responses_with_digest) = if responses_by_digest.len() > 1 {
info!(
"Multiple task response digests found for task {} ({} distinct digests). Attempting median-based consensus.",
task_id,
responses_by_digest.len()
);
for (digest, responses) in &responses_by_digest {
debug!(
"Digest {} has {} signers: {:?}",
digest,
responses.len(),
responses
.iter()
.map(|(id, _, _)| format!("{:?}", id))
.collect::<Vec<_>>()
);
}
match build_consensus(signed_responses, DEFAULT_TOLERANCE_PCT) {
ConsensusResult::AlreadyConsensus { digest, response_count } => {
info!(
"Consensus already achieved for task {} with {} responses",
task_id, response_count
);
let responses = responses_by_digest.remove(&digest).unwrap_or_default();
(digest, responses)
}
ConsensusResult::Normalized {
digest,
normalized_responses,
field_adjustments,
} => {
info!(
"Median-based consensus achieved for task {}. {} fields normalized.",
task_id,
field_adjustments.len()
);
for adj in &field_adjustments {
info!(
"Field '{}': median={:.6}, max_deviation={:.2}%",
adj.field_path, adj.median_value, adj.max_deviation_pct
);
}
let responses = normalized_responses
.into_iter()
.map(|(id, resp)| (id, resp, digest))
.collect::<Vec<_>>();
(digest, responses)
}
ConsensusResult::CannotReachConsensus {
reason,
divergent_fields,
} => {
warn!(
"Cannot reach median-based consensus for task {}: {}. Falling back to digest selection.",
task_id, reason
);
for field in &divergent_fields {
warn!(
"Divergent field '{}': values={:?}, median={:.6}, max_deviation={:.2}% (tolerance={:.2}%)",
field.field_path, field.values, field.median, field.max_deviation_pct, field.tolerance_pct
);
}
responses_by_digest
.into_iter()
.max_by_key(|(_, responses)| responses.len())
.ok_or_else(|| eyre::eyre!("No valid task response digests found"))?
}
}
} else {
responses_by_digest
.into_iter()
.next()
.ok_or_else(|| eyre::eyre!("No valid task response digests found"))?
};
info!(
"Aggregating {} responses for task {} with digest {}",
responses_with_digest.len(),
task_id,
task_response_digest
);
info!(
"Fetching operator AVS states for {} operators at block {}",
responses_with_digest.len(),
task_created_block
);
let operator_state_avs = self
.avs_registry_service
.get_operators_avs_state_at_block(task_created_block, quorum_numbers)
.await
.map_err(|e| eyre::eyre!("Failed to get operator AVS states: {}", e))?;
info!("Fetched {} operator states", operator_state_avs.len());
let quorums_avs_state = self
.avs_registry_service
.get_quorums_avs_state_at_block(quorum_numbers, task_created_block)
.await
.map_err(|e| eyre::eyre!("Failed to get quorum AVS states: {}", e))?;
let quorum_apks_g1: Vec<eigensdk::crypto_bls::BlsG1Point> = quorum_numbers
.iter()
.filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
.map(|avs_state| avs_state.agg_pub_key_g1.clone())
.collect();
info!("Extracted {} quorum aggregate keys", quorum_apks_g1.len());
let mut aggregated_operators_map: HashMap<FixedBytes<32>, newton_prover_chainio::bls::AggregatedOperators> =
HashMap::new();
for (operator_id, signed_response, digest) in &responses_with_digest {
let operator_id_fixed = FixedBytes((*operator_id).into());
let operator_state = operator_state_avs
.get(&operator_id_fixed)
.ok_or_else(|| eyre::eyre!("Operator state not found for operator {}", operator_id))?;
let bls_signature = signed_response.signature();
let is_new_digest = !aggregated_operators_map.contains_key(digest);
let updated_aggregated = update_aggregated_operators(
task_id,
&mut aggregated_operators_map,
operator_state,
*digest,
bls_signature,
operator_id_fixed,
)
.map_err(|e| eyre::eyre!("Failed to aggregate operator {}: {}", operator_id, e))?;
if is_new_digest {
aggregated_operators_map.insert(*digest, updated_aggregated);
}
}
let aggregated_operators = aggregated_operators_map
.remove(&task_response_digest)
.ok_or_else(|| eyre::eyre!("Failed to get aggregated operators"))?;
let signer_count = aggregated_operators.signers_operator_ids_set.len();
info!(
"Aggregated {} operators for digest {}",
signer_count, task_response_digest
);
if signer_count == 0 {
return Err(eyre::eyre!(
"No valid signers for task {} with digest {} - cannot build aggregated response",
task_id,
task_response_digest
));
}
let aggregated_response = BlsAggregatorService::build_aggregated_response(
task_id,
task_created_block,
task_response_digest,
&operator_state_avs,
aggregated_operators,
&self.avs_registry_service,
&quorum_apks_g1,
quorum_numbers,
)
.await
.map_err(|e| eyre::eyre!("Failed to build aggregated response: {}", e))?;
if let Some(mut state) = self.task_states.get_mut(&task_id) {
state.set_quorum_locked();
debug!(
task_id = %task_id,
"[DEBUG] QUORUM_LOCKED: Task marked as quorum-locked (direct aggregation)"
);
}
Ok(aggregated_response)
}
pub async fn update_response_indices(
&self,
task_id: TaskId,
service_response: BlsAggregationServiceResponse,
new_task_created_block: u64,
quorum_numbers: &[u8],
) -> Result<BlsAggregationServiceResponse, eyre::Error> {
if service_response.task_created_block == new_task_created_block {
debug!(
"Task {} indices already at block {}, no update needed",
task_id, new_task_created_block
);
return Ok(service_response);
}
info!(
"Updating check signatures indices for task {} from block {} to block {}",
task_id, service_response.task_created_block, new_task_created_block
);
debug!(
"Task {}: using {} stored non-signer operator IDs for index update",
task_id,
service_response.non_signers_operators_ids.len()
);
let indices = self
.avs_registry_service
.get_check_signatures_indices(
new_task_created_block,
quorum_numbers.into(),
service_response.non_signers_operators_ids.clone(),
)
.await
.map_err(|e| {
eyre::eyre!(
"Failed to get check signatures indices at block {}: {:?}",
new_task_created_block,
e
)
})?;
let quorums_avs_state = self
.avs_registry_service
.get_quorums_avs_state_at_block(quorum_numbers, new_task_created_block)
.await
.map_err(|e| {
eyre::eyre!(
"Failed to get quorum AVS states at block {}: {:?}",
new_task_created_block,
e
)
})?;
let quorum_apks_g1: Vec<eigensdk::crypto_bls::BlsG1Point> = quorum_numbers
.iter()
.filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
.map(|avs_state| avs_state.agg_pub_key_g1.clone())
.collect();
info!(
"Task {}: successfully updated indices and quorum APKs from block {} to block {}",
task_id, service_response.task_created_block, new_task_created_block
);
Ok(BlsAggregationServiceResponse {
task_created_block: new_task_created_block,
non_signer_quorum_bitmap_indices: indices.nonSignerQuorumBitmapIndices,
quorum_apk_indices: indices.quorumApkIndices,
total_stake_indices: indices.totalStakeIndices,
non_signer_stake_indices: indices.nonSignerStakeIndices,
quorum_apks_g1,
..service_response
})
}
pub fn cancel_task(&self, task_id: &TaskId) {
debug!(%task_id, "cancelling aggregation slot for failed task");
self.task_states.remove(task_id);
self.task_response_receivers.remove(task_id);
}
pub fn cancel_aggregation_loop(&self, task_id: &TaskId) {
self.service_handle.cancel_aggregation_loop(*task_id);
}
pub fn cleanup_stale_tasks(&self, ttl: Duration) -> usize {
let now = Instant::now();
let count_before = self.task_states.len();
self.task_states.retain(|task_id, state| {
let is_fresh = now.duration_since(state.created_at) <= ttl;
if !is_fresh {
self.task_response_receivers.remove(task_id);
}
is_fresh
});
count_before.saturating_sub(self.task_states.len())
}
pub fn task_states_count(&self) -> usize {
self.task_states.len()
}
pub fn task_receivers_count(&self) -> usize {
self.task_response_receivers.len()
}
pub fn set_task_attestation_data(&self, task_id: TaskId, data: alloy::primitives::Bytes) {
if let Some(mut state) = self.task_states.get_mut(&task_id) {
state.attestation_data = data;
}
}
pub fn get_task_attestation_data(&self, task_id: &TaskId) -> alloy::primitives::Bytes {
self.task_states
.get(task_id)
.map(|state| state.attestation_data.clone())
.unwrap_or_default()
}
pub fn invalidate_avs_registry_caches(&self) {
self.avs_registry_service.invalidate_caches();
}
}
#[cfg(test)]
mod tests {
use super::*;
use ark_ec::AffineRepr;
use async_trait::async_trait;
use eigensdk::{
client_avsregistry::error::AvsRegistryError, types::avs_state::QuorumAvsState,
utils::slashing::middleware::operator_state_retriever::OperatorStateRetriever::CheckSignaturesIndices,
};
use uuid::Uuid;
struct TestBlsServiceHandle {}
impl TestBlsServiceHandle {
fn new() -> Self {
Self {}
}
}
#[async_trait]
impl BlsServiceHandle for TestBlsServiceHandle {
async fn initialize_task(
&self,
metadata: TaskMetadata,
) -> Result<
UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
BlsAggregationServiceError,
> {
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
Ok(rx)
}
async fn process_signature(&self, task_signature: TaskSignature) -> Result<(), BlsAggregationServiceError> {
Ok(())
}
fn cancel_aggregation_loop(&self, _task_id: TaskId) {}
}
struct TestAvsRegistryService {}
impl TestAvsRegistryService {
fn new() -> Self {
Self {}
}
}
fn test_avs_registry_service() -> AvsRegistryServiceArcCaller {
AvsRegistryServiceArcCaller {
inner: Arc::new(TestAvsRegistryService::new()),
}
}
#[async_trait]
impl AvsRegistryService for TestAvsRegistryService {
async fn get_operators_avs_state_at_block(
&self,
block_num: u64,
quorum_nums: &[u8],
) -> Result<HashMap<FixedBytes<32>, OperatorAvsState>, AvsRegistryError> {
Ok(HashMap::new())
}
async fn get_quorums_avs_state_at_block(
&self,
quorum_nums: &[u8],
block_num: u64,
) -> Result<HashMap<u8, QuorumAvsState>, AvsRegistryError> {
Ok(HashMap::new())
}
async fn get_check_signatures_indices(
&self,
reference_block_number: u64,
quorum_numbers: Vec<u8>,
non_signer_operator_ids: Vec<FixedBytes<32>>,
) -> Result<CheckSignaturesIndices, AvsRegistryError> {
Ok(CheckSignaturesIndices::default())
}
}
#[async_trait]
impl AvsRegistryServiceCaller for TestAvsRegistryService {
fn warm_operator_id_cache(&self, _mappings: Vec<(FixedBytes<32>, Address)>) {}
fn cached_operator_count(&self) -> usize {
0
}
fn get_operator_address(&self, operator_id: &FixedBytes<32>) -> Option<Address> {
None
}
fn get_all_operator_addresses(&self) -> Vec<(FixedBytes<32>, Address)> {
vec![]
}
fn invalidate_caches(&self) {}
}
fn test_aggregator_core() -> AggregatorCore {
AggregatorCore {
service_handle: Box::new(TestBlsServiceHandle::new()),
task_response_receivers: DashMap::new(),
task_states: Arc::new(DashMap::new()),
cancellation_token: CancellationToken::new(),
is_destination_chain: false,
rpc: ChainRpcProviderConfig::load(),
source_chain_id: 0,
dest_chain_id: 0,
task_manager_addr: Address::ZERO,
operator_registry_address: Address::ZERO,
avs_registry_service: test_avs_registry_service(),
source_multichain_contracts: None,
dest_operator_table_updater: None,
}
}
fn create_signed_response_with_data(task_id: TaskId, operator_id: OperatorId, data: &[u8]) -> SignedTaskResponse {
use alloy::primitives::{Address, Bytes, U256};
use ark_bn254::G1Affine;
use eigensdk::crypto_bls::Signature;
use newton_prover_core::newton_prover_task_manager::{INewtonPolicy, NewtonMessage};
let signature = Signature::new(G1Affine::generator());
let task_response = crate::rpc_server::TaskResponse {
task_id,
policy_client: Address::ZERO,
policy_id: B256::ZERO,
policy_address: Address::ZERO,
intent: NewtonMessage::Intent {
from: Address::ZERO,
to: Address::ZERO,
value: U256::ZERO,
data: Bytes::from(data.to_vec()),
chainId: U256::ZERO,
functionSignature: Bytes::new(),
},
intent_signature: Bytes::new(),
evaluation_result: vec![],
policy_task_data: NewtonMessage::PolicyTaskData {
policyId: B256::ZERO,
policyAddress: Address::ZERO,
policy: Bytes::new(),
policyData: vec![],
},
policy_config: INewtonPolicy::PolicyConfig {
policyParams: Bytes::new(),
expireAfter: 0,
},
initialization_timestamp: U256::ZERO,
};
SignedTaskResponse::new(task_id, task_response, signature, operator_id)
}
fn digest_of(signed_response: &SignedTaskResponse) -> TaskResponseDigest {
let binding: BindingTaskResponse = signed_response.task_response.clone().into();
compute_consensus_digest(&binding)
}
#[tokio::test]
async fn test_lock_quorum() {
let core = test_aggregator_core();
let uuid = Uuid::new_v4();
let mut task_id_bytes = [0u8; 32];
task_id_bytes[..16].copy_from_slice(uuid.as_bytes());
let task_id: TaskId = TaskId::from(task_id_bytes);
core.initialize_task(task_id, 0, vec![0], 50, Duration::from_secs(10), 1)
.await
.unwrap();
let operator = OperatorId::from(B256::with_last_byte(0x42));
let signed_response = create_signed_response_with_data(task_id, operator, b"response from operator 1");
core.process_signed_response(signed_response).await.unwrap();
let task_state_before = core.task_states.get(&task_id).expect("task state").clone();
{
core.task_states
.get_mut(&task_id)
.expect("task state")
.set_quorum_locked();
}
let late_operator = OperatorId::from(B256::with_last_byte(0x43));
let late_signed_response =
create_signed_response_with_data(task_id, late_operator, b"response from operator 2");
core.process_signed_response(late_signed_response).await.unwrap();
let task_state_after = core.task_states.get(&task_id).expect("task state");
assert_eq!(task_state_after.task_responses.len(), 1);
assert!(task_state_after.is_quorum_locked());
let before_response_digest = task_state_before.task_responses.keys().next().unwrap();
let after_response_digest = task_state_after.task_responses.keys().next().unwrap();
assert_eq!(before_response_digest.to_string(), after_response_digest.to_string());
}
#[tokio::test]
async fn divergent_digest_rejected_after_first_insert() {
let core = test_aggregator_core();
let uuid = Uuid::new_v4();
let mut task_id_bytes = [0u8; 32];
task_id_bytes[..16].copy_from_slice(uuid.as_bytes());
let task_id: TaskId = TaskId::from(task_id_bytes);
core.initialize_task(task_id, 0, vec![0], 50, Duration::from_secs(10), 1)
.await
.unwrap();
let op_a = OperatorId::from(B256::with_last_byte(0x42));
let op_b = OperatorId::from(B256::with_last_byte(0x43));
let response_a = create_signed_response_with_data(task_id, op_a, b"price_100");
let response_b = create_signed_response_with_data(task_id, op_b, b"price_105");
let digest_a = digest_of(&response_a);
let digest_b = digest_of(&response_b);
assert_ne!(digest_a.to_string(), digest_b.to_string());
core.process_signed_response(response_a).await.unwrap();
core.process_signed_response(response_b).await.unwrap();
let state = core.task_states.get(&task_id).unwrap();
assert_eq!(state.task_responses.len(), 1);
let digest_a_response = core.get_task_response(task_id, digest_a);
let digest_b_response = core.get_task_response(task_id, digest_b);
assert!(digest_a_response.is_some());
assert!(digest_b_response.is_none());
}
#[tokio::test]
async fn concurrent_operators_race_on_task_responses() {
let core = Arc::new(test_aggregator_core());
let uuid = Uuid::new_v4();
let mut task_id_bytes = [0u8; 32];
task_id_bytes[..16].copy_from_slice(uuid.as_bytes());
let task_id: TaskId = TaskId::from(task_id_bytes);
core.initialize_task(task_id, 0, vec![0], 50, Duration::from_secs(10), 1)
.await
.unwrap();
let op_a = OperatorId::from(B256::with_last_byte(0x42));
let op_b = OperatorId::from(B256::with_last_byte(0x43));
let response_a = create_signed_response_with_data(task_id, op_a, b"price_100");
let response_b = create_signed_response_with_data(task_id, op_b, b"price_105");
let digest_a = digest_of(&response_a);
let digest_b = digest_of(&response_b);
assert_ne!(digest_a.to_string(), digest_b.to_string());
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let core_a = core.clone();
let barrier_a = barrier.clone();
let handle_a = tokio::spawn(async move {
barrier_a.wait().await; core_a.process_signed_response(response_a).await.unwrap();
});
let core_b = core.clone();
let handle_b = tokio::spawn(async move {
barrier.wait().await; core_b.process_signed_response(response_b).await.unwrap();
});
handle_a.await.unwrap();
handle_b.await.unwrap();
let state = core.task_states.get(&task_id).unwrap();
assert_eq!(state.task_responses.len(), 1);
let digest_a_present = core.get_task_response(task_id, digest_a).is_some();
let digest_b_present = core.get_task_response(task_id, digest_b).is_some();
assert!(digest_a_present ^ digest_b_present, "exactly one digest should survive");
}
#[tokio::test]
async fn same_digest_from_multiple_operators_idempotent() {
let core = test_aggregator_core();
let uuid = Uuid::new_v4();
let mut task_id_bytes = [0u8; 32];
task_id_bytes[..16].copy_from_slice(uuid.as_bytes());
let task_id: TaskId = TaskId::from(task_id_bytes);
core.initialize_task(task_id, 0, vec![0], 50, Duration::from_secs(10), 1)
.await
.unwrap();
let op_a = OperatorId::from(B256::with_last_byte(0x42));
let op_b = OperatorId::from(B256::with_last_byte(0x43));
let response_a = create_signed_response_with_data(task_id, op_a, b"price_100");
let response_b = create_signed_response_with_data(task_id, op_b, b"price_100");
let digest_a = digest_of(&response_a);
let digest_b = digest_of(&response_b);
assert_eq!(digest_a.to_string(), digest_b.to_string());
core.process_signed_response(response_a).await.unwrap();
core.process_signed_response(response_b).await.unwrap();
let state = core.task_states.get(&task_id).unwrap();
assert_eq!(state.task_responses.len(), 1);
let digest_a_response = core.get_task_response(task_id, digest_a);
let digest_b_response = core.get_task_response(task_id, digest_b);
assert!(digest_a_response.is_some());
assert!(digest_b_response.is_some());
}
}