Skip to main content

newton_prover_aggregator/
lib.rs

1//! Aggregator crate
2
3#[macro_use]
4extern crate newton_prover_core;
5
6/// Config
7pub mod config;
8/// Median-based consensus logic for normalizing operator responses
9pub mod consensus;
10/// Core aggregator logic shared between implementations
11pub mod core;
12/// Aggregator error
13pub mod error;
14/// RPC server
15pub mod rpc_server;
16use alloy::{
17    dyn_abi::SolType,
18    providers::{Provider, ProviderBuilder, WsConnect},
19    rpc::types::Filter,
20    sol_types::{SolEvent, SolValue},
21};
22use ark_ec::AffineRepr;
23pub use core::AggregatorCore;
24use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
25pub use error::{AggregatorCoreError, AggregatorError};
26use futures_util::StreamExt;
27use jsonrpsee::{
28    server::{RpcModule, Server},
29    types::ErrorObject,
30};
31pub use newton_prover_chainio::bls::BlsAggregationServiceResponse;
32use newton_prover_chainio::bls::ServiceHandle;
33use newton_prover_core::{
34    common::chain::get_block_duration_ms,
35    config::{key::EcdsaKey, rpc::ChainRpcProviderConfig, NewtonAvsConfig},
36    r#newton_prover_task_manager::{
37        INewtonProverTaskManager::{Task, TaskResponse as BindingTaskResponse},
38        NewtonProverTaskManager::NewTaskCreated,
39    },
40    TaskId,
41};
42pub use rpc_server::{
43    ConsensusCommitRequest, ConsensusCommitResponse, ConsensusData, ConsensusPrepareRequest, ConsensusPrepareResponse,
44    FieldAdjustment, OperatorErrorResponse, SignConsensusRequest, SignConsensusResponse, SignedTaskResponse,
45    TaskResponse,
46};
47// Backward compatibility re-exports (deprecated)
48#[allow(deprecated)]
49pub use rpc_server::{
50    EvaluateAndSignRequest, EvaluateAndSignResponse, FetchPolicyDataRequest, UnsignedPolicyDataResponse,
51};
52use std::{
53    collections::HashMap,
54    net::{SocketAddr, ToSocketAddrs},
55    sync::Arc,
56};
57use tokio::{
58    task::JoinHandle,
59    time::{sleep, Duration},
60};
61use tracing::{error, info, instrument, warn, Instrument};
62
63use crate::config::AggregatorConfig;
64use newton_prover_chainio::avs::writer::AvsWriter;
65
66// Removed TASK_CHALLENGE_WINDOW_BLOCK constant - now fetched dynamically from contract
67
68/// Aggregator
69#[allow(missing_debug_implementations)]
70pub struct Aggregator {
71    /// Socket address
72    port_address: String,
73    /// AVS writer
74    avs_writer: AvsWriter,
75    /// HashMap to store tasks
76    tasks: HashMap<TaskId, Task>,
77    /// Core aggregator logic shared with gateway aggregator
78    core: Arc<AggregatorCore>,
79    /// RPC provider configuration for all chains
80    rpc: ChainRpcProviderConfig,
81    /// Chain ID for RPC lookups (destination chain in multichain mode)
82    chain_id: u64,
83    /// Cached task response window block (fetched once at initialization)
84    task_response_window_block: u64,
85    /// Timeout for waiting for aggregation responses (in seconds)
86    aggregation_timeout_secs: u64,
87}
88
89impl Aggregator {
90    /// Creates a new aggregator
91    ///
92    /// # Arguments
93    ///
94    /// * [`NewtonAvsConfig<AggregatorConfig>`] - The configuration for the aggregator
95    ///
96    /// # Returns
97    ///
98    /// * `Self` - The aggregator
99    ///
100    /// # Errors
101    ///
102    /// * `AggregatorError` - The error that occurred
103    pub async fn new(config: NewtonAvsConfig<AggregatorConfig>) -> Result<Self, AggregatorError> {
104        let chain_id = config.chain_id;
105        let source_chain_id = config.get_source_chain_id();
106
107        // Get RPC config for the current chain (destination chain in multichain mode)
108        let rpc_config = config.rpc.get_or_err(chain_id)?.clone();
109
110        // Source chain: where EigenLayer AVS contracts (OperatorRegistry, etc.) are deployed
111        let source_rpc = config.rpc.get_or_err(source_chain_id)?.clone();
112
113        let avs_registry_chain_reader = AvsRegistryChainReader::new(
114            config.contracts.avs.operator_registry,
115            config.contracts.avs.operator_state_retriever,
116            source_rpc.http.clone(),
117        )
118        .await?;
119
120        let avs_writer = AvsWriter::new(
121            config.contracts.avs.newton_prover_task_manager,
122            source_rpc.http.clone(),
123            config.service.signer.clone(),
124        )
125        .await?;
126
127        let aggregator_address = newton_prover_core::keys::load_ecdsa(&config.service.signer)
128            .map_err(|e| AggregatorError::KeyError(e.to_string()))?
129            .address();
130        newton_prover_metrics::set_aggregator_address(&aggregator_address.to_string());
131
132        let core = AggregatorCore::new(
133            avs_registry_chain_reader,
134            config.contracts.avs.operator_registry,
135            config.contracts.avs.operator_state_retriever,
136            config.is_destination_chain(),
137            config.rpc.clone(),
138            source_chain_id,
139            chain_id,
140            config.contracts.avs.newton_prover_task_manager,
141        )
142        .await
143        .map_err(|e| AggregatorError::KeyError(e.to_string()))?;
144
145        // Fetch task response window block once at initialization (cached for reuse)
146        let task_response_window_block =
147            newton_prover_core::common::chain::get_task_response_window_blocks(&rpc_config.http)
148                .await
149                .map_err(|e| AggregatorError::KeyError(format!("Failed to get task response window: {}", e)))?
150                as u64;
151
152        Ok(Self {
153            port_address: config.service.ip_address.clone(),
154            avs_writer,
155            tasks: HashMap::new(),
156            core: Arc::new(core),
157            rpc: config.rpc.clone(),
158            chain_id,
159            task_response_window_block,
160            aggregation_timeout_secs: config.service.aggregation_timeout_secs,
161        })
162    }
163
164    /// Starts the aggregator service with three tasks: one for the server,
165    /// one for processing tasks and one for processing aggregator responses.
166    ///
167    /// # Arguments
168    ///
169    /// * `ws_rpc_url` - The websocket RPC URL
170    ///
171    /// # Returns
172    ///
173    /// * `eyre::Result<()>` - The result of the operation
174    ///
175    /// # Errors
176    ///
177    /// * The error that occurred
178    pub async fn start(self, ws_rpc_url: String) -> eyre::Result<()> {
179        info!("Starting aggregator");
180
181        // Initialize Prometheus metrics exporter on port 9092
182        let metrics_addr: SocketAddr = "0.0.0.0:9092".parse()?;
183        let _metrics_handle = newton_prover_metrics::prometheus::init_prometheus_exporter(metrics_addr).await?;
184        info!("Prometheus metrics server started on {}", metrics_addr);
185
186        let Self {
187            avs_writer,
188            port_address,
189            tasks,
190            core,
191            rpc: _,
192            chain_id: _,
193            task_response_window_block,
194            aggregation_timeout_secs,
195        } = self;
196
197        let tasks = Arc::new(tokio::sync::Mutex::new(tasks));
198        let core_clone = Arc::clone(&core);
199
200        // Spawn three tasks: one for the server, one for processing tasks, and one for processing aggregator responses
201        // 1) Process signatures
202        let server_handle = Self::start_server(port_address, Arc::clone(&core)).await?;
203        // 2) Process tasks
204        let process_handle = tokio::spawn(Self::process_tasks(
205            ws_rpc_url.clone(),
206            task_response_window_block,
207            Arc::clone(&tasks),
208            Arc::clone(&core),
209        ));
210        // 3) Process aggregator responses
211        let responses_handle = tokio::spawn(Self::process_aggregator_responses(
212            Arc::clone(&tasks),
213            Arc::clone(&core),
214            avs_writer,
215            aggregation_timeout_secs,
216        ));
217
218        // Wait for the tasks to complete and handle potential errors
219        let (_server_result, process_result, responses_result) =
220            tokio::try_join!(server_handle, process_handle, responses_handle)
221                .inspect_err(|e| error!("Error in task execution: {e:?}"))
222                .map_err(|e| eyre::eyre!("Task execution failed {e}"))?;
223
224        process_result?;
225        responses_result?;
226
227        Ok(())
228    }
229
230    /// Starts the RPC server and returns a future that ends once the server is stopped
231    async fn start_server(
232        port_address: String,
233        core: Arc<AggregatorCore>,
234    ) -> eyre::Result<JoinHandle<()>, AggregatorError> {
235        // See https://github.com/paritytech/jsonrpsee/blob/42461391fee47c94d42c4a7303355525291df9f6/examples/examples/cors_server.rs
236        let mut module = RpcModule::new(core);
237        module
238            .register_async_method("process_signed_task_response", |params, ctx, _| async move {
239                let core = ctx.as_ref();
240                let signed_task_response = params
241                    .parse::<SignedTaskResponse>()
242                    .map_err(|err| ErrorObject::owned(0, err.to_string(), None::<()>))?;
243
244                // Call the process_signed_response function from AggregatorCore
245                let result = core.process_signed_response(signed_task_response).await;
246
247                match result {
248                    Ok(()) => Ok(true),
249                    Err(err) => Err(ErrorObject::owned(0, err.to_string(), None::<()>)),
250                }
251            })
252            .expect("method name is unique");
253        let socket: SocketAddr = port_address
254            .to_socket_addrs()
255            .map_err(AggregatorError::IOError)?
256            .next()
257            .ok_or_else(|| {
258                AggregatorError::IOError(std::io::Error::new(
259                    std::io::ErrorKind::InvalidInput,
260                    "No addresses found for hostname",
261                ))
262            })?;
263        let middleware = tower::ServiceBuilder::new();
264        let server = Server::builder().set_http_middleware(middleware).build(socket).await?;
265
266        let handle = server.start(module);
267
268        info!("Server running at {socket}");
269
270        Ok(tokio::spawn(handle.stopped()))
271    }
272
273    /// Processes new tasks created in the task manager contract
274    #[instrument(skip_all)]
275    async fn process_tasks(
276        ws_rpc_url: String,
277        task_response_window_block: u64,
278        tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
279        core: Arc<AggregatorCore>,
280    ) -> eyre::Result<()> {
281        let ws = WsConnect::new(ws_rpc_url.clone());
282        let provider = ProviderBuilder::new()
283            .disable_recommended_fillers()
284            .connect_ws(ws)
285            .await?;
286        let chain_id = provider.get_chain_id().await?;
287
288        let filter = Filter::new().event_signature(NewTaskCreated::SIGNATURE_HASH);
289        let sub = provider.subscribe_logs(&filter).await?;
290        let mut stream = sub.into_stream();
291
292        while let Some(log) = stream.next().await {
293            let NewTaskCreated { taskId, task, state: _ } = log.log_decode()?.inner.data;
294
295            let span = tracing::info_span!(
296                "process_task",
297                task_id = %taskId,
298                policy_client = %task.policyClient
299            );
300
301            async {
302                info!("aggregator: new task created");
303
304                tasks.lock().await.insert(taskId, task.clone());
305
306                let mut quorum_nums: Vec<u8> = vec![];
307                for val in task.quorumNumbers.iter() {
308                    quorum_nums.push(*val);
309                }
310                info!(
311                    quorum_nums = %hex!(&quorum_nums),
312                    "aggregator: initialize_task quorum_nums"
313                );
314
315                let block_time_ms = get_block_duration_ms(chain_id).unwrap_or(12000);
316                let time_to_expiry = tokio::time::Duration::from_millis(task_response_window_block * block_time_ms);
317
318                info!("aggregator: initializing task");
319                core.initialize_task(
320                    taskId,
321                    task.taskCreatedBlock.into(),
322                    quorum_nums,
323                    task.quorumThresholdPercentage.try_into()?,
324                    time_to_expiry,
325                    0, // Expected operator count unknown for standalone aggregator (disables early exit)
326                )
327                .await
328                .map_err(|e| {
329                    error!("Failed to initialize task {}: {:?}", taskId, e);
330                    eyre::eyre!(e)
331                })?;
332
333                Ok::<(), eyre::Error>(())
334            }
335            .instrument(span)
336            .await?;
337        }
338
339        Ok(())
340    }
341
342    /// Processes BLS Aggregator Service responses
343    ///
344    /// With per-task channels, we iterate over active tasks and wait for their responses.
345    /// This eliminates mutex contention and response stealing.
346    async fn process_aggregator_responses(
347        tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
348        core: Arc<AggregatorCore>,
349        avs_writer: AvsWriter,
350        aggregation_timeout_secs: u64,
351    ) -> eyre::Result<()> {
352        loop {
353            // Get list of active task IDs
354            let active_task_ids: Vec<TaskId> = {
355                let tasks_guard = tasks.lock().await;
356                tasks_guard.keys().cloned().collect()
357            };
358
359            if active_task_ids.is_empty() {
360                // No active tasks, wait a bit before checking again
361                tokio::time::sleep(Duration::from_millis(100)).await;
362                continue;
363            }
364
365            // Filter to only tasks that have receivers (not already consumed)
366            // Check receivers efficiently to avoid unnecessary wait_for_aggregation calls
367            let tasks_with_receivers: Vec<TaskId> = {
368                let mut tasks_with_receivers = Vec::new();
369                for task_id in active_task_ids {
370                    if core.has_task_receiver(task_id).await {
371                        tasks_with_receivers.push(task_id);
372                    }
373                }
374                tasks_with_receivers
375            };
376
377            if tasks_with_receivers.is_empty() {
378                // No tasks with receivers, wait a bit before checking again
379                tokio::time::sleep(Duration::from_millis(200)).await;
380                continue;
381            }
382
383            // Use a longer timeout (1-2 seconds) to reduce polling overhead
384            // This is more efficient than 100ms polling and reduces latency
385            // We process tasks sequentially but with a reasonable timeout per task
386            let mut found_response = false;
387            for task_id in tasks_with_receivers {
388                // Double-check receiver still exists (may have been consumed by another call)
389                if !core.has_task_receiver(task_id).await {
390                    continue;
391                }
392
393                // Use configurable timeout to accommodate destination chain operations
394                // (certificate verifier queries add ~500ms-1s of latency)
395                let timeout = Duration::from_secs(aggregation_timeout_secs);
396                match core.wait_for_aggregation(task_id, timeout).await {
397                    Ok(service_response) => {
398                        found_response = true;
399                        info!("Received aggregated response for task_id: {}", service_response.task_id);
400
401                        let task = match tasks.lock().await.get(&service_response.task_id).cloned() {
402                            Some(task) => task,
403                            None => {
404                                info!("Task not found for task_id: {}", service_response.task_id);
405                                continue;
406                            }
407                        };
408
409                        let task_response = core.task_states.get(&task.taskId).and_then(|state| {
410                            state
411                                .task_responses
412                                .get(&service_response.task_response_digest)
413                                .cloned()
414                        });
415
416                        if let Some(task_response) = task_response {
417                            if let Err(err) = Self::submit_aggregated_response_with_retry(
418                                &core,
419                                &avs_writer,
420                                &task,
421                                &task_response,
422                                &service_response,
423                            )
424                            .await
425                            {
426                                error!(
427                                    task_id = hex!(task.taskId),
428                                    error = ?err,
429                                    "Failed to submit aggregated response to contract"
430                                );
431                            }
432                        } else {
433                            info!(
434                                "Not found task_response for task_id: {}",
435                                hex!(service_response.task_id)
436                            );
437                        }
438                        break; // Process one response per loop iteration
439                    }
440                    Err(crate::error::AggregatorCoreError::Timeout { .. }) => {
441                        // Timeout is expected, continue to next task
442                        continue;
443                    }
444                    Err(crate::error::AggregatorCoreError::TaskNotInitialized { .. }) => {
445                        // Receiver already consumed (by gateway or previous call) - skip this task
446                        continue;
447                    }
448                    Err(crate::error::AggregatorCoreError::InvalidTimeoutDuration(_)) => {
449                        // Invalid timeout - this shouldn't happen, but handle gracefully
450                        warn!(task_id = %task_id, "Invalid timeout duration - skipping task");
451                        continue;
452                    }
453                    Err(e) => {
454                        error!(task_id = %task_id, "Error receiving aggregated response: {:?}", e);
455                        continue;
456                    }
457                }
458            }
459
460            if !found_response {
461                // No responses found, wait a bit before checking again
462                // Reduced from 50ms to avoid excessive CPU usage when no tasks are active
463                tokio::time::sleep(Duration::from_millis(100)).await;
464            }
465        }
466    }
467
468    async fn submit_aggregated_response_with_retry(
469        core: &AggregatorCore,
470        avs_writer: &AvsWriter,
471        task: &Task,
472        task_response: &BindingTaskResponse,
473        service_response: &BlsAggregationServiceResponse,
474    ) -> Result<(), AggregatorError> {
475        const MAX_RETRIES: usize = 3;
476        let mut attempt: usize = 0;
477        let mut delay = Duration::from_millis(500);
478
479        loop {
480            attempt += 1;
481
482            let result = core
483                .submit_aggregated_response(
484                    avs_writer,
485                    task.clone(),
486                    task_response.clone(),
487                    service_response.clone(),
488                )
489                .await
490                .map(|_| ())
491                .map_err(|_| AggregatorError::SendAggregatedResponseError);
492
493            match result {
494                Ok(()) => return Ok(()),
495                Err(err) => {
496                    let err_str = err.to_string();
497                    let should_retry =
498                        err_str.contains("SendAggregatedResponseError") || err_str.contains("Failed to submit");
499                    if !should_retry || attempt >= MAX_RETRIES {
500                        return Err(AggregatorError::SendAggregatedResponseError);
501                    }
502
503                    warn!(
504                        task_id = hex!(service_response.task_id),
505                        attempt,
506                        retry_in_secs = delay.as_secs(),
507                        "Failed to submit aggregated response; retrying"
508                    );
509
510                    sleep(delay).await;
511                    delay = (delay.saturating_mul(2)).min(Duration::from_millis(2000));
512                }
513            }
514        }
515    }
516
517    fn should_retry_contract_submission(err: &AggregatorError) -> bool {
518        matches!(err, AggregatorError::SendAggregatedResponseError)
519    }
520}