newton-aggregator 0.4.13

newton prover aggregator utils
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
//! Aggregator crate

#[macro_use]
extern crate newton_core;

/// Config
pub mod config;
/// Median-based consensus logic for normalizing operator responses
pub mod consensus;
/// Core aggregator logic shared between implementations
pub mod core;
/// Aggregator error
pub mod error;
/// RPC server
pub mod rpc_server;
/// Shared simulation request/response types for operator delegation
pub mod simulation;
/// State-commit aggregation pipeline (Track 1 — NEWT-1115).
pub mod state_commit;
/// Local task supervisor (`spawn_monitored`) — avoids circular dep on gateway.
pub mod task_supervisor;
use alloy::{
    dyn_abi::SolType,
    providers::{Provider, ProviderBuilder, WsConnect},
    rpc::types::Filter,
    sol_types::{SolEvent, SolValue},
};
use ark_ec::AffineRepr;
pub use core::AggregatorCore;
use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
pub use error::{AggregatorCoreError, AggregatorError};
use futures_util::StreamExt;
use jsonrpsee::{
    server::{RpcModule, Server},
    types::ErrorObject,
};
pub use newton_chainio::bls::BlsAggregationServiceResponse;
use newton_chainio::bls::ServiceHandle;
use newton_core::{
    common::chain::get_block_duration_ms,
    config::{key::EcdsaKey, rpc::ChainRpcProviderConfig, NewtonAvsConfig},
    r#newton_prover_task_manager::{
        INewtonProverTaskManager::{Task, TaskResponse as BindingTaskResponse},
        NewtonProverTaskManager::NewTaskCreated,
    },
    TaskId,
};
pub use rpc_server::{
    ConsensusCommitRequest, ConsensusCommitResponse, ConsensusData, ConsensusPrepareRequest, ConsensusPrepareResponse,
    FieldAdjustment, OperatorErrorResponse, PartialDecryptionData, PreparePhaseChainContext, PreparePhaseFetchRequest,
    SignedTaskResponse, TaskResponse,
};
use std::{
    collections::HashMap,
    net::{SocketAddr, ToSocketAddrs},
    sync::Arc,
};
use tokio::{
    task::JoinHandle,
    time::{sleep, Duration},
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, warn, Instrument};

use crate::{config::AggregatorConfig, state_commit::StateCommitOrchestrator};
use newton_chainio::avs::writer::AvsWriter;

// Removed TASK_CHALLENGE_WINDOW_BLOCK constant - now fetched dynamically from contract

/// Aggregator
#[allow(missing_debug_implementations)]
pub struct Aggregator {
    /// Socket address
    port_address: String,
    /// AVS writer (Arc-shared with `AvsWriterCommitter` so all txs for the
    /// aggregator's ECDSA signer route through one submitter — see
    /// `.claude/rules/lessons.md` "single-signer routing")
    avs_writer: Arc<AvsWriter>,
    /// HashMap to store tasks
    tasks: HashMap<TaskId, Task>,
    /// Core aggregator logic shared with gateway aggregator
    core: Arc<AggregatorCore>,
    /// RPC provider configuration for all chains
    rpc: ChainRpcProviderConfig,
    /// Chain ID for RPC lookups (destination chain in multichain mode)
    chain_id: u64,
    /// Cached task response window block (fetched once at initialization)
    task_response_window_block: u64,
    /// Timeout for waiting for aggregation responses (in seconds)
    aggregation_timeout_secs: u64,
    /// Per-chain state-commit orchestrator (NEWT-1115). `Some` only when the
    /// `dev-stub` feature is enabled and a `StateCommitRegistry` address is
    /// configured; production builds will populate this in NEWT-1116 with the
    /// real `EnclavePcr0Provider`.
    state_commit_orchestrator: Option<Arc<StateCommitOrchestrator>>,
}

impl Aggregator {
    /// Creates a new aggregator
    ///
    /// # Arguments
    ///
    /// * [`NewtonAvsConfig<AggregatorConfig>`] - The configuration for the aggregator
    ///
    /// # Returns
    ///
    /// * `Self` - The aggregator
    ///
    /// # Errors
    ///
    /// * `AggregatorError` - The error that occurred
    pub async fn new(config: NewtonAvsConfig<AggregatorConfig>) -> Result<Self, AggregatorError> {
        let chain_id = config.chain_id;
        let source_chain_id = config.get_source_chain_id();

        // Get RPC config for the current chain (destination chain in multichain mode)
        let rpc_config = config.rpc.get_or_err(chain_id)?.clone();

        // Source chain: where EigenLayer AVS contracts (OperatorRegistry, etc.) are deployed
        let source_rpc = config.rpc.get_or_err(source_chain_id)?.clone();

        // operator_state_retriever is source-only and validated by `load_src` →
        // `validate_for_role(ChainRole::Source)`. Aggregator only runs on source
        // chains, so this should be Some here; the typed error is defensive.
        let operator_state_retriever = config.contracts.avs.operator_state_retriever.ok_or_else(|| {
            std::io::Error::other(
                "aggregator requires source-chain operator_state_retriever but it was None — load_src must validate this",
            )
        })?;

        let avs_registry_chain_reader = AvsRegistryChainReader::new(
            config.contracts.avs.operator_registry,
            operator_state_retriever,
            source_rpc.http.clone(),
        )
        .await?;

        let state_commit_registry_addr = config.contracts.avs.state_commit_registry_or_zero();

        let avs_writer = Arc::new(
            AvsWriter::new(
                config.contracts.avs.newton_prover_task_manager,
                config.contracts.avs.identity_registry,
                state_commit_registry_addr,
                source_rpc.http.clone(),
                config.service.signer.clone(),
                config.chain_id,
            )
            .await?,
        );

        // Load the aggregator's ECDSA signer once. We keep both the address
        // (for metrics) and the signer itself (used by the state-commit HTTP
        // client to wrap each outbound `newt_*StateCommit*` call in an
        // EIP-712 `Authenticated<T>` envelope — operator-side dispatch
        // gates on `isTaskGenerator(signer)`).
        let aggregator_signer = newton_core::keys::load_ecdsa(&config.service.signer)
            .map_err(|e| AggregatorError::KeyError(e.to_string()))?;
        let aggregator_address = aggregator_signer.address();
        newton_metric::set_aggregator_address(&aggregator_address.to_string());

        let dest_operator_table_updater = config
            .contracts
            .destination_multichain()
            .ok()
            .map(|dm| dm.operator_table_updater);
        let core = AggregatorCore::new(
            avs_registry_chain_reader,
            config.contracts.avs.operator_registry,
            operator_state_retriever,
            config.is_destination_chain(),
            config.rpc.clone(),
            source_chain_id,
            chain_id,
            config.contracts.avs.newton_prover_task_manager,
            dest_operator_table_updater,
        )
        .await
        .map_err(|e| AggregatorError::KeyError(e.to_string()))?;

        // Fetch task response window block once at initialization (cached for reuse)
        let task_response_window_block =
            newton_core::common::chain::get_task_response_window_blocks(&rpc_config.http)
                .await
                .map_err(|e| AggregatorError::KeyError(format!("Failed to get task response window: {}", e)))?
                as u64;

        // Construct the per-chain orchestrator under `dev-stub`. Production
        // wiring lands in NEWT-1116: `EnclavePcr0Provider` replaces
        // `StubPcr0Provider`, the operator set is hydrated from
        // `AvsRegistryServiceCaller`, and the gate is lifted. We share
        // `Arc<AvsWriter>` with `process_aggregator_responses` so all txs
        // for the aggregator's ECDSA signer route through one submitter
        // (single-signer routing per `lessons.md`).
        #[cfg(any(test, feature = "dev-stub"))]
        let state_commit_orchestrator: Option<Arc<StateCommitOrchestrator>> = {
            use std::{collections::HashMap as StdHashMap, time::Duration};

            use alloy::providers::ProviderBuilder;
            use newton_core::state_commit_registry::StateCommitRegistry::StateCommitRegistryInstance;

            use crate::state_commit::{
                AvsWriterCommitter, HttpStateCommitOperatorClient, OnchainRegistryReader, StateCommitAggregator,
                StubOperatorSetSnapshotReader, StubPcr0Provider,
            };

            let http_provider = ProviderBuilder::new().connect_http(
                source_rpc
                    .http
                    .parse()
                    .map_err(|e| AggregatorError::KeyError(format!("invalid source RPC URL: {e}")))?,
            );
            let registry_instance = StateCommitRegistryInstance::new(state_commit_registry_addr, http_provider);
            let reader = Arc::new(OnchainRegistryReader::new(registry_instance, chain_id));
            let pcr0 = Arc::new(StubPcr0Provider::new());
            // Production wiring (NEWT-1116) will hydrate sockets from
            // `AvsRegistryServiceCaller`. Until then, the dev-stub uses an
            // empty map; the signer + chain_id + task_manager triple still
            // matter because the HTTP client embeds them in every outbound
            // envelope.
            let operator_client = Arc::new(
                HttpStateCommitOperatorClient::new(
                    StdHashMap::new(),
                    aggregator_signer.clone(),
                    chain_id,
                    config.contracts.avs.newton_prover_task_manager,
                )
                .map_err(|e| AggregatorError::KeyError(format!("state-commit HTTP client init: {e}")))?,
            );
            let aggregator = Arc::new(StateCommitAggregator::new(chain_id, 6_700));
            let committer = Arc::new(AvsWriterCommitter(Arc::clone(&avs_writer)));
            // Per-tick refresh: production impl (NEWT-1116) composes
            // `ViewBN254CertificateVerifier::latestReferenceTimestamp` with
            // `AvsRegistryServiceCaller`. Until then, the dev-stub returns an
            // empty snapshot so devnet smoke tests run without operators.
            let operator_set_reader = Arc::new(StubOperatorSetSnapshotReader::new(Vec::new(), 0));
            // Dev-stub: empty snapshot until the production wiring (NEWT-1116)
            // composes `BN254TableCalculator.getOperatorInfos` +
            // `BN254CertificateVerifier.latestReferenceTimestamp` against
            // configured contract addresses. The orchestrator's tick will
            // emit `empty_set` flavor against this empty provider — same
            // behavior as the operator-set reader stub above.
            let operator_table_provider =
                state_commit::operator_table::fake::FakeOperatorTableProvider::from_operators(0, Vec::new());
            // Dev-stub operator set: AVS = task manager address, id = 0.
            let operator_set =
                newton_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet {
                    avs: config.contracts.avs.newton_prover_task_manager,
                    id: 0,
                };
            let orch = StateCommitOrchestrator::new(
                chain_id,
                Duration::from_secs(120),
                reader,
                pcr0,
                operator_client,
                aggregator,
                committer,
                operator_set_reader,
                operator_table_provider,
                operator_set,
            );
            Some(Arc::new(orch))
        };
        #[cfg(not(any(test, feature = "dev-stub")))]
        let state_commit_orchestrator: Option<Arc<StateCommitOrchestrator>> = None;

        Ok(Self {
            port_address: config.service.ip_address.clone(),
            avs_writer,
            tasks: HashMap::new(),
            core: Arc::new(core),
            rpc: config.rpc.clone(),
            chain_id,
            task_response_window_block,
            aggregation_timeout_secs: config.service.aggregation_timeout_secs,
            state_commit_orchestrator,
        })
    }

    /// Starts the aggregator service with three tasks: one for the server,
    /// one for processing tasks and one for processing aggregator responses.
    ///
    /// # Arguments
    ///
    /// * `ws_rpc_url` - The websocket RPC URL
    ///
    /// # Returns
    ///
    /// * `eyre::Result<()>` - The result of the operation
    ///
    /// # Errors
    ///
    /// * The error that occurred
    pub async fn start(self, ws_rpc_url: String) -> eyre::Result<()> {
        info!("Starting aggregator");

        // Initialize Prometheus metrics exporter on port 9092
        let metrics_addr: SocketAddr = "0.0.0.0:9092".parse()?;
        let _metrics_handle = newton_metric::prometheus::init_prometheus_exporter(metrics_addr).await?;
        info!("Prometheus metrics server started on {}", metrics_addr);

        let Self {
            avs_writer,
            port_address,
            tasks,
            core,
            rpc: _,
            chain_id: _,
            task_response_window_block,
            aggregation_timeout_secs,
            state_commit_orchestrator,
        } = self;

        let tasks = Arc::new(tokio::sync::Mutex::new(tasks));
        let core_clone = Arc::clone(&core);

        // 4th task — per-chain state-commit orchestrator (120s tick). Runs
        // alongside the join-arm trio under a `CancellationToken`. The
        // orchestrator's `run()` loop is `biased` on the cancel arm, so
        // signalling `cancel()` after `try_join!` resolves drives a clean
        // shutdown without aborting an in-flight tick. In production builds
        // (no `dev-stub`), `state_commit_orchestrator` is `None` until
        // NEWT-1116 wires `EnclavePcr0Provider`.
        let orchestrator_token = CancellationToken::new();
        let orchestrator_handle: Option<JoinHandle<()>> = state_commit_orchestrator.map(|orch| {
            let token = orchestrator_token.clone();
            // `spawn_monitored` wraps the loop in `catch_unwind` so a panic
            // surfaces as a structured tracing event with `task = ...`. The
            // orchestrator's own `tick()` is already panic-isolated, but a
            // panic in the surrounding `select!` / shutdown plumbing would
            // otherwise vanish to stderr — see `.claude/rules/rust.md`
            // "Bare `tokio::spawn` for background loops" anti-pattern.
            crate::task_supervisor::spawn_monitored("state_commit_orchestrator", async move { orch.run(token).await })
        });

        // Spawn three tasks: one for the server, one for processing tasks, and one for processing aggregator responses
        // 1) Process signatures
        let server_handle = Self::start_server(port_address, Arc::clone(&core)).await?;
        // 2) Process tasks
        let process_handle = tokio::spawn(Self::process_tasks(
            ws_rpc_url.clone(),
            task_response_window_block,
            Arc::clone(&tasks),
            Arc::clone(&core),
        ));
        // 3) Process aggregator responses
        let responses_handle = tokio::spawn(Self::process_aggregator_responses(
            Arc::clone(&tasks),
            Arc::clone(&core),
            avs_writer,
            aggregation_timeout_secs,
        ));

        // Wait for the tasks to complete and handle potential errors
        let join_result = tokio::try_join!(server_handle, process_handle, responses_handle)
            .inspect_err(|e| error!("Error in task execution: {e:?}"));

        // Signal the orchestrator to stop and drain its handle before
        // surfacing any join error. `cancel()` is idempotent and the loop
        // exits at the next `select!` poll without aborting an in-flight
        // tick, so we never mid-tick on a `commit_state_root` submission.
        orchestrator_token.cancel();
        if let Some(handle) = orchestrator_handle {
            if let Err(e) = handle.await {
                error!("state-commit orchestrator join failed: {e:?}");
            }
        }

        let (_server_result, process_result, responses_result) =
            join_result.map_err(|e| eyre::eyre!("Task execution failed {e}"))?;

        process_result?;
        responses_result?;

        Ok(())
    }

    /// Starts the RPC server and returns a future that ends once the server is stopped
    async fn start_server(
        port_address: String,
        core: Arc<AggregatorCore>,
    ) -> eyre::Result<JoinHandle<()>, AggregatorError> {
        // See https://github.com/paritytech/jsonrpsee/blob/42461391fee47c94d42c4a7303355525291df9f6/examples/examples/cors_server.rs
        let mut module = RpcModule::new(core);
        module
            .register_async_method("process_signed_task_response", |params, ctx, _| async move {
                let core = ctx.as_ref();
                let signed_task_response = params
                    .parse::<SignedTaskResponse>()
                    .map_err(|err| ErrorObject::owned(0, err.to_string(), None::<()>))?;

                // Call the process_signed_response function from AggregatorCore
                let result = core.process_signed_response(signed_task_response).await;

                match result {
                    Ok(()) => Ok(true),
                    Err(err) => Err(ErrorObject::owned(0, err.to_string(), None::<()>)),
                }
            })
            .expect("method name is unique");
        let socket: SocketAddr = port_address
            .to_socket_addrs()
            .map_err(AggregatorError::IOError)?
            .next()
            .ok_or_else(|| {
                AggregatorError::IOError(std::io::Error::new(
                    std::io::ErrorKind::InvalidInput,
                    "No addresses found for hostname",
                ))
            })?;
        let middleware = tower::ServiceBuilder::new();
        let server = Server::builder().set_http_middleware(middleware).build(socket).await?;

        let handle = server.start(module);

        info!("Server running at {socket}");

        Ok(tokio::spawn(handle.stopped()))
    }

    /// Processes new tasks created in the task manager contract
    #[instrument(skip_all)]
    async fn process_tasks(
        ws_rpc_url: String,
        task_response_window_block: u64,
        tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
        core: Arc<AggregatorCore>,
    ) -> eyre::Result<()> {
        let ws = WsConnect::new(ws_rpc_url.clone());
        let provider = ProviderBuilder::new()
            .disable_recommended_fillers()
            .connect_ws(ws)
            .await?;
        let chain_id = provider.get_chain_id().await?;

        let filter = Filter::new().event_signature(NewTaskCreated::SIGNATURE_HASH);
        let sub = provider.subscribe_logs(&filter).await?;
        let mut stream = sub.into_stream();

        while let Some(log) = stream.next().await {
            let NewTaskCreated { taskId, task, state: _ } = log.log_decode()?.inner.data;

            let span = tracing::info_span!(
                "process_task",
                task_id = %taskId,
                policy_client = %task.policyClient
            );

            async {
                info!("aggregator: new task created");

                tasks.lock().await.insert(taskId, task.clone());

                let mut quorum_nums: Vec<u8> = vec![];
                for val in task.quorumNumbers.iter() {
                    quorum_nums.push(*val);
                }
                info!(
                    quorum_nums = %hex!(&quorum_nums),
                    "aggregator: initialize_task quorum_nums"
                );

                let block_time_ms = get_block_duration_ms(chain_id).unwrap_or(12000);
                let time_to_expiry = tokio::time::Duration::from_millis(task_response_window_block * block_time_ms);

                info!("aggregator: initializing task");
                core.initialize_task(
                    taskId,
                    task.taskCreatedBlock.into(),
                    quorum_nums,
                    task.quorumThresholdPercentage.try_into()?,
                    time_to_expiry,
                    0, // Expected operator count unknown for standalone aggregator (disables early exit)
                )
                .await
                .map_err(|e| {
                    error!("Failed to initialize task {}: {:?}", taskId, e);
                    eyre::eyre!(e)
                })?;

                Ok::<(), eyre::Error>(())
            }
            .instrument(span)
            .await?;
        }

        Ok(())
    }

    /// Processes BLS Aggregator Service responses
    ///
    /// With per-task channels, we iterate over active tasks and wait for their responses.
    /// This eliminates mutex contention and response stealing.
    async fn process_aggregator_responses(
        tasks: Arc<tokio::sync::Mutex<HashMap<TaskId, Task>>>,
        core: Arc<AggregatorCore>,
        avs_writer: Arc<AvsWriter>,
        aggregation_timeout_secs: u64,
    ) -> eyre::Result<()> {
        loop {
            // Get list of active task IDs
            let active_task_ids: Vec<TaskId> = {
                let tasks_guard = tasks.lock().await;
                tasks_guard.keys().cloned().collect()
            };

            if active_task_ids.is_empty() {
                // No active tasks, wait a bit before checking again
                tokio::time::sleep(Duration::from_millis(100)).await;
                continue;
            }

            // Filter to only tasks that have receivers (not already consumed)
            // Check receivers efficiently to avoid unnecessary wait_for_aggregation calls
            let tasks_with_receivers: Vec<TaskId> = {
                let mut tasks_with_receivers = Vec::new();
                for task_id in active_task_ids {
                    if core.has_task_receiver(task_id).await {
                        tasks_with_receivers.push(task_id);
                    }
                }
                tasks_with_receivers
            };

            if tasks_with_receivers.is_empty() {
                // No tasks with receivers, wait a bit before checking again
                tokio::time::sleep(Duration::from_millis(200)).await;
                continue;
            }

            // Use a longer timeout (1-2 seconds) to reduce polling overhead
            // This is more efficient than 100ms polling and reduces latency
            // We process tasks sequentially but with a reasonable timeout per task
            let mut found_response = false;
            for task_id in tasks_with_receivers {
                // Double-check receiver still exists (may have been consumed by another call)
                if !core.has_task_receiver(task_id).await {
                    continue;
                }

                // Use configurable timeout to accommodate destination chain operations
                // (certificate verifier queries add ~500ms-1s of latency)
                let timeout = Duration::from_secs(aggregation_timeout_secs);
                match core.wait_for_aggregation(task_id, timeout).await {
                    Ok(service_response) => {
                        found_response = true;
                        info!("Received aggregated response for task_id: {}", service_response.task_id);

                        let task = match tasks.lock().await.get(&service_response.task_id).cloned() {
                            Some(task) => task,
                            None => {
                                info!("Task not found for task_id: {}", service_response.task_id);
                                continue;
                            }
                        };

                        let task_response = core.task_states.get(&task.taskId).and_then(|state| {
                            state
                                .task_responses
                                .get(&service_response.task_response_digest)
                                .cloned()
                        });

                        if let Some(task_response) = task_response {
                            if let Err(err) = Self::submit_aggregated_response_with_retry(
                                &core,
                                &avs_writer,
                                &task,
                                &task_response,
                                &service_response,
                            )
                            .await
                            {
                                error!(
                                    task_id = hex!(task.taskId),
                                    error = ?err,
                                    "Failed to submit aggregated response to contract"
                                );
                            }
                        } else {
                            info!(
                                "Not found task_response for task_id: {}",
                                hex!(service_response.task_id)
                            );
                        }
                        break; // Process one response per loop iteration
                    }
                    Err(crate::error::AggregatorCoreError::Timeout { .. }) => {
                        // Timeout is expected, continue to next task
                        continue;
                    }
                    Err(crate::error::AggregatorCoreError::TaskNotInitialized { .. }) => {
                        // Receiver already consumed (by gateway or previous call) - skip this task
                        continue;
                    }
                    Err(crate::error::AggregatorCoreError::InvalidTimeoutDuration(_)) => {
                        // Invalid timeout - this shouldn't happen, but handle gracefully
                        warn!(task_id = %task_id, "Invalid timeout duration - skipping task");
                        continue;
                    }
                    Err(e) => {
                        error!(task_id = %task_id, "Error receiving aggregated response: {:?}", e);
                        continue;
                    }
                }
            }

            if !found_response {
                // No responses found, wait a bit before checking again
                // Reduced from 50ms to avoid excessive CPU usage when no tasks are active
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    }

    async fn submit_aggregated_response_with_retry(
        core: &AggregatorCore,
        avs_writer: &AvsWriter,
        task: &Task,
        task_response: &BindingTaskResponse,
        service_response: &BlsAggregationServiceResponse,
    ) -> Result<(), AggregatorError> {
        const MAX_RETRIES: usize = 3;
        let mut attempt: usize = 0;
        let mut delay = Duration::from_millis(500);

        loop {
            attempt += 1;

            let result = core
                .submit_aggregated_response(
                    avs_writer,
                    task.clone(),
                    task_response.clone(),
                    service_response.clone(),
                )
                .await
                .map(|_| ())
                .map_err(|_| AggregatorError::SendAggregatedResponseError);

            match result {
                Ok(()) => return Ok(()),
                Err(err) => {
                    let err_str = err.to_string();
                    let should_retry =
                        err_str.contains("SendAggregatedResponseError") || err_str.contains("Failed to submit");
                    if !should_retry || attempt >= MAX_RETRIES {
                        return Err(AggregatorError::SendAggregatedResponseError);
                    }

                    warn!(
                        task_id = hex!(service_response.task_id),
                        attempt,
                        retry_in_secs = delay.as_secs(),
                        "Failed to submit aggregated response; retrying"
                    );

                    sleep(delay).await;
                    delay = (delay.saturating_mul(2)).min(Duration::from_millis(2000));
                }
            }
        }
    }

    fn should_retry_contract_submission(err: &AggregatorError) -> bool {
        matches!(err, AggregatorError::SendAggregatedResponseError)
    }
}