1#[macro_use]
4extern crate newton_prover_core;
5
6pub mod config;
8pub mod consensus;
10pub mod core;
12pub mod error;
14pub mod rpc_server;
16use alloy::{
17 dyn_abi::SolType,
18 providers::{Provider, ProviderBuilder, WsConnect},
19 rpc::types::Filter,
20 sol_types::{SolEvent, SolValue},
21};
22use ark_ec::AffineRepr;
23pub use core::AggregatorCore;
24use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
25pub use error::{AggregatorCoreError, AggregatorError};
26use futures_util::StreamExt;
27use jsonrpsee::{
28 server::{RpcModule, Server},
29 types::ErrorObject,
30};
31pub use newton_prover_chainio::bls::BlsAggregationServiceResponse;
32use newton_prover_chainio::bls::ServiceHandle;
33use newton_prover_core::{
34 common::chain::get_block_duration_ms,
35 config::{key::EcdsaKey, rpc::ChainRpcProviderConfig, NewtonAvsConfig},
36 r#newton_prover_task_manager::{
37 INewtonProverTaskManager::{Task, TaskResponse as BindingTaskResponse},
38 NewtonProverTaskManager::NewTaskCreated,
39 },
40 TaskId,
41};
42pub use rpc_server::{
43 ConsensusCommitRequest, ConsensusCommitResponse, ConsensusData, ConsensusPrepareRequest, ConsensusPrepareResponse,
44 FieldAdjustment, OperatorErrorResponse, SignConsensusRequest, SignConsensusResponse, SignedTaskResponse,
45 TaskResponse,
46};
47#[allow(deprecated)]
49pub use rpc_server::{
50 EvaluateAndSignRequest, EvaluateAndSignResponse, FetchPolicyDataRequest, UnsignedPolicyDataResponse,
51};
52use std::{
53 collections::HashMap,
54 net::{SocketAddr, ToSocketAddrs},
55 sync::Arc,
56};
57use tokio::{
58 task::JoinHandle,
59 time::{sleep, Duration},
60};
61use tracing::{error, info, instrument, warn, Instrument};
62
63use crate::config::AggregatorConfig;
64use newton_prover_chainio::avs::writer::AvsWriter;
65
66#[allow(missing_debug_implementations)]
70pub struct Aggregator {
71 port_address: String,
73 avs_writer: AvsWriter,
75 tasks: HashMap<TaskId, Task>,
77 core: Arc<AggregatorCore>,
79 rpc: ChainRpcProviderConfig,
81 chain_id: u64,
83 task_response_window_block: u64,
85 aggregation_timeout_secs: u64,
87}
88
89impl Aggregator {
90 pub async fn new(config: NewtonAvsConfig<AggregatorConfig>) -> Result<Self, AggregatorError> {
104 let chain_id = config.chain_id;
105 let source_chain_id = config.get_source_chain_id();
106
107 let rpc_config = config.rpc.get_or_err(chain_id)?.clone();
109
110 let source_rpc = config.rpc.get_or_err(source_chain_id)?.clone();
112
113 let avs_registry_chain_reader = AvsRegistryChainReader::new(
114 config.contracts.avs.operator_registry,
115 config.contracts.avs.operator_state_retriever,
116 source_rpc.http.clone(),
117 )
118 .await?;
119
120 let avs_writer = AvsWriter::new(
121 config.contracts.avs.newton_prover_task_manager,
122 source_rpc.http.clone(),
123 config.service.signer.clone(),
124 )
125 .await?;
126
127 let aggregator_address = newton_prover_core::keys::load_ecdsa(&config.service.signer)
128 .map_err(|e| AggregatorError::KeyError(e.to_string()))?
129 .address();
130 newton_prover_metrics::set_aggregator_address(&aggregator_address.to_string());
131
132 let core = AggregatorCore::new(
133 avs_registry_chain_reader,
134 config.contracts.avs.operator_registry,
135 config.contracts.avs.operator_state_retriever,
136 config.is_destination_chain(),
137 config.rpc.clone(),
138 source_chain_id,
139 chain_id,
140 config.contracts.avs.newton_prover_task_manager,
141 )
142 .await
143 .map_err(|e| AggregatorError::KeyError(e.to_string()))?;
144
145 let task_response_window_block =
147 newton_prover_core::common::chain::get_task_response_window_blocks(&rpc_config.http)
148 .await
149 .map_err(|e| AggregatorError::KeyError(format!("Failed to get task response window: {}", e)))?
150 as u64;
151
152 Ok(Self {
153 port_address: config.service.ip_address.clone(),
154 avs_writer,
155 tasks: HashMap::new(),
156 core: Arc::new(core),
157 rpc: config.rpc.clone(),
158 chain_id,
159 task_response_window_block,
160 aggregation_timeout_secs: config.service.aggregation_timeout_secs,
161 })
162 }
163
164 pub async fn start(self, ws_rpc_url: String) -> eyre::Result<()> {
179 info!("Starting aggregator");
180
181 let metrics_addr: SocketAddr = "0.0.0.0:9092".parse()?;
183 let _metrics_handle = newton_prover_metrics::prometheus::init_prometheus_exporter(metrics_addr).await?;
184 info!("Prometheus metrics server started on {}", metrics_addr);
185
186 let Self {
187 avs_writer,
188 port_address,
189 tasks,
190 core,
191 rpc: _,
192 chain_id: _,
193 task_response_window_block,
194 aggregation_timeout_secs,
195 } = self;
196
197 let tasks = Arc::new(tokio::sync::Mutex::new(tasks));
198 let core_clone = Arc::clone(&core);
199
200 let server_handle = Self::start_server(port_address, Arc::clone(&core)).await?;
203 let process_handle = tokio::spawn(Self::process_tasks(
205 ws_rpc_url.clone(),
206 task_response_window_block,
207 Arc::clone(&tasks),
208 Arc::clone(&core),
209 ));
210 let responses_handle = tokio::spawn(Self::process_aggregator_responses(
212 Arc::clone(&tasks),
213 Arc::clone(&core),
214 avs_writer,
215 aggregation_timeout_secs,
216 ));
217
218 let (_server_result, process_result, responses_result) =
220 tokio::try_join!(server_handle, process_handle, responses_handle)
221 .inspect_err(|e| error!("Error in task execution: {e:?}"))
222 .map_err(|e| eyre::eyre!("Task execution failed {e}"))?;
223
224 process_result?;
225 responses_result?;
226
227 Ok(())
228 }
229
230 async fn start_server(
232 port_address: String,
233 core: Arc<AggregatorCore>,
234 ) -> eyre::Result<JoinHandle<()>, AggregatorError> {
235 let mut module = RpcModule::new(core);
237 module
238 .register_async_method("process_signed_task_response", |params, ctx, _| async move {
239 let core = ctx.as_ref();
240 let signed_task_response = params
241 .parse::<SignedTaskResponse>()
242 .map_err(|err| ErrorObject::owned(0, err.to_string(), None::<()>))?;
243
244 let result = core.process_signed_response(signed_task_response).await;
246
247 match result {
248 Ok(()) => Ok(true),
249 Err(err) => Err(ErrorObject::owned(0, err.to_string(), None::<()>)),
250 }
251 })
252 .expect("method name is unique");
253 let socket: SocketAddr = port_address
254 .to_socket_addrs()
255 .map_err(AggregatorError::IOError)?
256 .next()
257 .ok_or_else(|| {
258 AggregatorError::IOError(std::io::Error::new(
259 std::io::ErrorKind::InvalidInput,
260 "No addresses found for hostname",
261 ))
262 })?;
263 let middleware = tower::ServiceBuilder::new();
264 let server = Server::builder().set_http_middleware(middleware).build(socket).await?;
265
266 let handle = server.start(module);
267
268 info!("Server running at {socket}");
269
270 Ok(tokio::spawn(handle.stopped()))
271 }
272
273 #[instrument(skip_all)]
275 async fn process_tasks(
276 ws_rpc_url: String,
277 task_response_window_block: u64,
278 tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
279 core: Arc<AggregatorCore>,
280 ) -> eyre::Result<()> {
281 let ws = WsConnect::new(ws_rpc_url.clone());
282 let provider = ProviderBuilder::new()
283 .disable_recommended_fillers()
284 .connect_ws(ws)
285 .await?;
286 let chain_id = provider.get_chain_id().await?;
287
288 let filter = Filter::new().event_signature(NewTaskCreated::SIGNATURE_HASH);
289 let sub = provider.subscribe_logs(&filter).await?;
290 let mut stream = sub.into_stream();
291
292 while let Some(log) = stream.next().await {
293 let NewTaskCreated { taskId, task, state: _ } = log.log_decode()?.inner.data;
294
295 let span = tracing::info_span!(
296 "process_task",
297 task_id = %taskId,
298 policy_client = %task.policyClient
299 );
300
301 async {
302 info!("aggregator: new task created");
303
304 tasks.lock().await.insert(taskId, task.clone());
305
306 let mut quorum_nums: Vec<u8> = vec![];
307 for val in task.quorumNumbers.iter() {
308 quorum_nums.push(*val);
309 }
310 info!(
311 quorum_nums = %hex!(&quorum_nums),
312 "aggregator: initialize_task quorum_nums"
313 );
314
315 let block_time_ms = get_block_duration_ms(chain_id).unwrap_or(12000);
316 let time_to_expiry = tokio::time::Duration::from_millis(task_response_window_block * block_time_ms);
317
318 info!("aggregator: initializing task");
319 core.initialize_task(
320 taskId,
321 task.taskCreatedBlock.into(),
322 quorum_nums,
323 task.quorumThresholdPercentage.try_into()?,
324 time_to_expiry,
325 0, )
327 .await
328 .map_err(|e| {
329 error!("Failed to initialize task {}: {:?}", taskId, e);
330 eyre::eyre!(e)
331 })?;
332
333 Ok::<(), eyre::Error>(())
334 }
335 .instrument(span)
336 .await?;
337 }
338
339 Ok(())
340 }
341
342 async fn process_aggregator_responses(
347 tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
348 core: Arc<AggregatorCore>,
349 avs_writer: AvsWriter,
350 aggregation_timeout_secs: u64,
351 ) -> eyre::Result<()> {
352 loop {
353 let active_task_ids: Vec<TaskId> = {
355 let tasks_guard = tasks.lock().await;
356 tasks_guard.keys().cloned().collect()
357 };
358
359 if active_task_ids.is_empty() {
360 tokio::time::sleep(Duration::from_millis(100)).await;
362 continue;
363 }
364
365 let tasks_with_receivers: Vec<TaskId> = {
368 let mut tasks_with_receivers = Vec::new();
369 for task_id in active_task_ids {
370 if core.has_task_receiver(task_id).await {
371 tasks_with_receivers.push(task_id);
372 }
373 }
374 tasks_with_receivers
375 };
376
377 if tasks_with_receivers.is_empty() {
378 tokio::time::sleep(Duration::from_millis(200)).await;
380 continue;
381 }
382
383 let mut found_response = false;
387 for task_id in tasks_with_receivers {
388 if !core.has_task_receiver(task_id).await {
390 continue;
391 }
392
393 let timeout = Duration::from_secs(aggregation_timeout_secs);
396 match core.wait_for_aggregation(task_id, timeout).await {
397 Ok(service_response) => {
398 found_response = true;
399 info!("Received aggregated response for task_id: {}", service_response.task_id);
400
401 let task = match tasks.lock().await.get(&service_response.task_id).cloned() {
402 Some(task) => task,
403 None => {
404 info!("Task not found for task_id: {}", service_response.task_id);
405 continue;
406 }
407 };
408
409 let task_response = core.task_states.get(&task.taskId).and_then(|state| {
410 state
411 .task_responses
412 .get(&service_response.task_response_digest)
413 .cloned()
414 });
415
416 if let Some(task_response) = task_response {
417 if let Err(err) = Self::submit_aggregated_response_with_retry(
418 &core,
419 &avs_writer,
420 &task,
421 &task_response,
422 &service_response,
423 )
424 .await
425 {
426 error!(
427 task_id = hex!(task.taskId),
428 error = ?err,
429 "Failed to submit aggregated response to contract"
430 );
431 }
432 } else {
433 info!(
434 "Not found task_response for task_id: {}",
435 hex!(service_response.task_id)
436 );
437 }
438 break; }
440 Err(crate::error::AggregatorCoreError::Timeout { .. }) => {
441 continue;
443 }
444 Err(crate::error::AggregatorCoreError::TaskNotInitialized { .. }) => {
445 continue;
447 }
448 Err(crate::error::AggregatorCoreError::InvalidTimeoutDuration(_)) => {
449 warn!(task_id = %task_id, "Invalid timeout duration - skipping task");
451 continue;
452 }
453 Err(e) => {
454 error!(task_id = %task_id, "Error receiving aggregated response: {:?}", e);
455 continue;
456 }
457 }
458 }
459
460 if !found_response {
461 tokio::time::sleep(Duration::from_millis(100)).await;
464 }
465 }
466 }
467
468 async fn submit_aggregated_response_with_retry(
469 core: &AggregatorCore,
470 avs_writer: &AvsWriter,
471 task: &Task,
472 task_response: &BindingTaskResponse,
473 service_response: &BlsAggregationServiceResponse,
474 ) -> Result<(), AggregatorError> {
475 const MAX_RETRIES: usize = 3;
476 let mut attempt: usize = 0;
477 let mut delay = Duration::from_millis(500);
478
479 loop {
480 attempt += 1;
481
482 let result = core
483 .submit_aggregated_response(
484 avs_writer,
485 task.clone(),
486 task_response.clone(),
487 service_response.clone(),
488 )
489 .await
490 .map(|_| ())
491 .map_err(|_| AggregatorError::SendAggregatedResponseError);
492
493 match result {
494 Ok(()) => return Ok(()),
495 Err(err) => {
496 let err_str = err.to_string();
497 let should_retry =
498 err_str.contains("SendAggregatedResponseError") || err_str.contains("Failed to submit");
499 if !should_retry || attempt >= MAX_RETRIES {
500 return Err(AggregatorError::SendAggregatedResponseError);
501 }
502
503 warn!(
504 task_id = hex!(service_response.task_id),
505 attempt,
506 retry_in_secs = delay.as_secs(),
507 "Failed to submit aggregated response; retrying"
508 );
509
510 sleep(delay).await;
511 delay = (delay.saturating_mul(2)).min(Duration::from_millis(2000));
512 }
513 }
514 }
515 }
516
517 fn should_retry_contract_submission(err: &AggregatorError) -> bool {
518 matches!(err, AggregatorError::SendAggregatedResponseError)
519 }
520}