newton-aggregator 0.4.18

newton prover aggregator utils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
# Newton Prover Aggregator

> **Note**: This is a **library crate** embedded within the Gateway service, not a standalone service. There is no separate aggregator binary or deployment. The Gateway imports and uses this library to perform BLS signature aggregation.

## Overview

The aggregator crate is the core library of the Newton Prover AVS (Actively Validated Service) responsible for orchestrating BLS signature aggregation from multiple operators. Embedded within the Gateway, it serves as the central coordinator that:

- **Initializes tasks** for BLS signature aggregation with quorum requirements
- **Processes signed task responses** from operators, buffering signatures that arrive before task initialization
- **Coordinates with the BLS Aggregation Service** to aggregate signatures and verify quorum thresholds
- **Submits aggregated responses** to on-chain contracts once quorum is reached
- **Manages memory and resources** to ensure reliable operation under load

The aggregator is designed for high availability, low latency, and robustness. It handles edge cases gracefully, prevents memory leaks through bounded data structures and cleanup mechanisms, and ensures errors in one task don't affect others.

## Architecture Overview

The aggregator architecture consists of two main components that work together:

```mermaid
flowchart TD
    subgraph Aggregator["AggregatorCore (core.rs)"]
        A["Task initialization & validation"]
        B["Signature processing & buffering"]
        C["Task response storage (task_responses)"]
        D["Pending signature buffer (pending_signatures)"]
        E["Background cleanup tasks"]
    end

    Aggregator -->|Channels - ServiceHandle| BLS

    subgraph BLS["BlsAggregatorService (bls.rs)"]
        F["BLS signature aggregation engine"]
        G["Per-task aggregator tasks"]
        H["Signature verification"]
        I["Quorum threshold checking"]
        J["Aggregated response generation"]
    end
```

### Component Relationships

**AggregatorCore** (`core.rs`):

- Central orchestrator that manages the lifecycle of aggregation tasks
- Handles operator signature submission and buffering
- Manages task response storage and cleanup
- Coordinates with the BLS service via `ServiceHandle`

**BlsAggregatorService** (`bls.rs`):

- Low-level BLS signature aggregation engine
- Runs per-task aggregation loops in isolated spawned tasks
- Performs cryptographic signature verification
- Aggregates signatures and checks quorum thresholds
- Returns aggregated responses via channels

### Data Flow

1. **Task Initialization**: `initialize_task()` → BLS service spawns `single_task_aggregator` task → **Creates per-task response channel** → Returns receiver to AggregatorCore
2. **Signature Processing**: `process_signed_response()` → Buffered if task not initialized, otherwise sent to BLS service
3. **Aggregation**: BLS service aggregates signatures per `task_response_digest`, checks quorum thresholds
4. **Response**: When quorum reached, aggregated response sent via **task-specific channel** (direct routing, no mutex contention)
5. **Wait for Aggregation**: `wait_for_aggregation(task_id, timeout)` → Receives from task-specific channel → Returns response
6. **Submission**: `submit_aggregated_response()` submits to contract and cleans up task state

### Consensus Module (`consensus.rs`)

The consensus module handles median-based normalization when operators return different values for time-sensitive data (e.g., prices).

**Two-Digest System**: BLS signature aggregation requires all operators to sign the same message. However, operators independently generate unique ECDSA attestations. The consensus module uses two digest types:

| Digest Type | Used For | Attestations |
|-------------|----------|--------------|
| Consensus Digest | BLS signing/verification | Excluded |
| Full Digest | Contract storage, challenge verification | Included |

**Median-Based Normalization Algorithm**:

1. Extract numeric fields from each operator's `policyTaskData.data` JSON
2. For each numeric field, compute the median across all operators
3. Verify all values are within tolerance (default 10%) of the median
4. If within tolerance, normalize all responses to use median values
5. Recompute consensus digest (attestations excluded) after normalization

Values are considered "in tolerance" if: `|value - median| / median <= tolerance_pct / 100`

**Key Functions**:

- `build_consensus(responses, tolerance_pct)` - Attempts consensus on signed responses
- `compute_consensus_from_unsigned(responses, tolerance_pct)` - Prepare phase consensus on unsigned policyTaskData
- `check_early_consensus(responses)` - Fast path when all data hashes are identical

## Core Components Deep Dive

### AggregatorCore (`core.rs`)

#### Purpose

`AggregatorCore` is the central orchestrator that manages the complete lifecycle of aggregation tasks. It provides a high-level API for task initialization, signature processing, and response submission while managing memory, concurrency, and error handling.

#### Key Data Structures

```rust
pub struct AggregatorCore {
    /// Service handle to interact with the BLS Aggregator Service.
    /// ServiceHandle is Clone with a thread-safe UnboundedSender
    pub service_handle: ServiceHandle,

    /// Per-task response receivers for direct routing (no mutex contention)
    /// Each task gets its own channel, eliminating response stealing and lock contention
    /// DashMap enables lock-free concurrent access for different TaskIds
    task_response_receivers: DashMap<TaskId, UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>>,

    /// DashMap for lock-free concurrent access - different tasks can access their states simultaneously
    /// Per-task locking eliminates contention between tasks accessing different TaskIds
    pub task_states: Arc<DashMap<TaskId, TaskState>>,

    /// Cancellation token for background tasks
    cancellation_token: CancellationToken,
}

/// Task state to reduce lock contention
#[derive(Debug, Clone)]
pub struct TaskState {
    /// Quorum numbers for reference timestamp queries
    quorum_nums: Vec<u8>,
    /// Operator errors for this task
    operator_errors: Vec<OperatorErrorResponse>,
    /// Expected operator count (for early exit detection)
    expected_operators: usize,
    /// Task responses by digest
    task_responses: HashMap<TaskResponseDigest, BindingTaskResponse>,
}
```

**Design Decisions**:

- **`task_response_receivers` uses `DashMap`**: Lock-free concurrent access for different TaskIds. Per-task channels enable zero mutex contention. Each task has its own dedicated channel, eliminating response stealing and serialization bottlenecks.
- **`task_states` uses `Arc<DashMap>`**: Lock-free concurrent access for different tasks. Per-task locking eliminates contention between tasks accessing different TaskIds. Critical for high-throughput scenarios with 10k+ concurrent tasks.
- **Simplified structure**: Moved task responses into `TaskState` for better locality and reduced lock scope.
- **Eliminated insertion tracking**: Removed separate insertion order tracking in favor of simpler cleanup strategies.

#### Key Methods

##### `new()`

Initializes the aggregator core and spawns background tasks:

```rust
pub async fn new(
    avs_registry_reader: AvsRegistryChainReader,
    operator_registry_address: Address,
    ws_rpc_url: Option<String>,
    http_rpc_url: String,
) -> Result<Self, eyre::Error>
```

**Responsibilities**:

- Creates BLS aggregation service (either in-memory or on-chain operator info)
- Initializes all data structures with appropriate synchronization primitives
- Spawns three background tasks:
  1. `process_pending_signatures_loop`: Processes buffered signatures when tasks are initialized
  2. `cleanup_expired_pending_signatures_loop`: Removes expired pending signatures (5s timeout)
  3. `cleanup_stale_task_responses_loop`: Evicts stale task responses (60s interval)

**Memory Safety**: All background tasks use `CancellationToken` for graceful shutdown, ensuring resources are cleaned up when `AggregatorCore` is dropped.

##### `initialize_task()`

Initializes a new aggregation task with validation:

```rust
pub async fn initialize_task(
    &self,
    task_id: TaskId,
    task_created_block: u64,
    quorum_nums: Vec<u8>,
    quorum_threshold_percentage: u8,
    time_to_expiry: Duration,
) -> Result<(), eyre::Error>
```

**Input Validation**:

- Task ID must be non-zero
- Quorum numbers must be non-empty
- Threshold percentage must be between 1 and 100
- Time to expiry must be non-zero

**Flow**:

1. Validates all inputs
2. Creates `TaskMetadata` with task configuration
3. Clones `ServiceHandle` before async call (avoids holding lock during async operation)
4. Sends `InitializeTask` message to BLS service
5. **Receives task-specific response receiver** from BLS service (per-task channel for direct routing)
6. **Stores receiver in `task_response_receivers`** for `wait_for_aggregation()` to use
7. Notifies pending signatures loop to retry buffered signatures for this task

**Error Handling**: Returns structured errors with context. Errors in initialization don't affect other tasks.

**Performance**: Per-task channels eliminate mutex contention and response stealing, enabling true concurrent processing of multiple tasks.

##### `process_signed_response()`

Processes a signed task response from an operator:

```rust
pub async fn process_signed_response(
    &self,
    signed_response: SignedTaskResponse
) -> Result<(), eyre::Error>
```

**Flow**:

1. Validates `task_id` and `operator_id` (both must be non-zero)
2. Computes `task_response_digest` via Keccak256 hash
3. Creates `TaskSignature` and sends to BLS service
4. **If task not initialized**: Buffers signature in `pending_signatures` (with size limit check)
5. **If task initialized**: Stores successful response in `task_responses` with size limit enforcement

**Memory Management**:

- Checks `MAX_TASK_RESPONSES` limit before adding new task entries
- Uses read lock to find oldest entry (non-blocking)
- Uses write lock only when evicting (minimizes lock duration)
- Lock-free counter increment for insertion order

**Error Handling**:

- `TaskNotFound`: Buffers signature for later processing
- Other errors: Logs with full context (task_id, operator_id, timing) and returns error
- Errors in one signature don't affect others

##### `wait_for_aggregation()`

Waits for aggregated response with timeout using per-task channels:

```rust
pub async fn wait_for_aggregation(
    &self,
    task_id: TaskId,
    timeout_duration: Duration,
) -> Result<BlsAggregationServiceResponse, AggregatorCoreError>
```

**Flow**:

1. Validates timeout duration and task_id (must be non-zero)
2. **Removes task-specific receiver from HashMap** (ensures only one waiter per task, receiver can't be cloned)
3. **Receives directly from task-specific channel** (no mutex lock needed, no response stealing)
4. Returns response or error with timing information
5. **Receiver automatically dropped** when function returns (no manual cleanup needed)

**Performance Benefits**:

- **Zero mutex contention**: Each task has its own channel, no shared lock needed
- **Zero response stealing**: Responses go directly to the correct task's channel
- **Low latency**: ~0.1-0.5ms vs 5-500ms under high concurrency (old approach)
- **High throughput**: Supports 10k+ concurrent tasks efficiently
- **Natural cancellation**: Dropping receiver = cancellation, no explicit cleanup needed

**Error Handling**:

- `TaskNotInitialized`: Task not found in receivers map
- `Timeout`: Includes operator errors for the specific task
- `AggregationServiceError`: Task-specific errors (guaranteed to be for this task_id)
- `Cancelled`: Operation was cancelled via cancellation token

**Isolation**: Each task's response channel is independent. One task's channel closure doesn't affect others.

##### `submit_aggregated_response()`

Submits aggregated response to contract:

```rust
pub async fn submit_aggregated_response(
    &self,
    avs_writer: &AvsWriter,
    task: Task,
    task_response: BindingTaskResponse,
    service_response: BlsAggregationServiceResponse,
) -> Result<TransactionReceipt, eyre::Error>
```

**Flow**:

1. Converts BLS response to contract format
2. Submits to contract via `AvsWriter`
3. On success: Cleans up `task_responses` entry and insertion order
4. Records metrics for success/failure and duration

**Cleanup**: Automatically removes task from `task_responses` and `task_responses_insertion_order` after successful submission to prevent memory leaks.

##### `update_response_indices()`

Updates the check signatures indices in a `BlsAggregationServiceResponse` for a new `task_created_block`:

```rust
pub async fn update_response_indices(
    &self,
    task_id: TaskId,
    service_response: BlsAggregationServiceResponse,
    new_task_created_block: u64,
    quorum_numbers: &[u8],
) -> Result<BlsAggregationServiceResponse, eyre::Error>
```

**Purpose**: When a task is created on-chain, the actual `taskCreatedBlock` may differ from the block used during initial aggregation (due to transaction confirmation timing). This method efficiently recalculates the index arrays for the correct block.

#### Background Tasks

##### `process_pending_signatures_loop`

Continuously processes buffered signatures when notified:

- Listens on `pending_notify_rx` channel for task IDs
- When notified, processes all buffered signatures for that task
- Uses cloned `ServiceHandle` to avoid holding lock during async operations
- Batch inserts successful responses to minimize lock contention
- Drops failed signatures (no retry mechanism)

**Performance**: Processes signatures in batch, collecting successful responses before single lock acquisition.

##### `cleanup_expired_pending_signatures_loop`

Periodically removes expired pending signatures:

- Runs every `PENDING_SIGNATURE_CLEANUP_INTERVAL` (10 seconds)
- Removes entries older than `PENDING_SIGNATURE_TIMEOUT` (5 seconds)
- Prevents unbounded growth if tasks never initialize

##### `cleanup_stale_task_responses_loop`

Periodically evicts stale task responses:

- Runs every `TASK_RESPONSES_CLEANUP_INTERVAL` (60 seconds)
- If over `MAX_TASK_RESPONSES` limit, evicts oldest 10% of entries
- Uses read lock to find oldest entries, write lock only when removing

### BLS Aggregation Service (`bls.rs`)

#### Purpose

The BLS Aggregation Service is the low-level engine that performs cryptographic BLS signature aggregation. It runs per-task aggregation loops in isolated spawned tasks, verifying signatures, aggregating them, and checking quorum thresholds.

#### Key Data Structures

```rust
/// Aggregated operators information for a specific task_response_digest
pub struct AggregatedOperators {
    signers_apk_g2: BlsG2Point,                    // Aggregated public key (G2)
    signers_agg_sig_g1: Signature,                 // Aggregated signature (G1)
    signers_total_stake_per_quorum: HashMap<u8, U256>,  // Total stake per quorum
    signers_operator_ids_set: HashMap<FixedBytes<32>, bool>,  // Set of signer operator IDs
}

/// Task metadata for initialization
pub struct TaskMetadata {
    task_id: TaskId,
    quorum_numbers: Vec<u8>,
    quorum_threshold_percentages: QuorumThresholdPercentages,
    time_to_expiry: Duration,
    window_duration: Duration,
    task_created_block: u64,
}

/// Response from BLS aggregation service
pub struct BlsAggregationServiceResponse {
    pub task_id: TaskId,
    pub task_created_block: u64,
    pub task_response_digest: TaskResponseDigest,
    pub non_signers_pub_keys_g1: Vec<BlsG1Point>,
    pub non_signers_operators_ids: Vec<FixedBytes<32>>,  // Stored for efficient index updates
    pub quorum_apks_g1: Vec<BlsG1Point>,
    pub signers_apk_g2: BlsG2Point,
    pub signers_agg_sig_g1: Signature,
    // Index arrays for on-chain verification
    pub non_signer_quorum_bitmap_indices: Vec<u32>,
    pub quorum_apk_indices: Vec<u32>,
    pub total_stake_indices: Vec<u32>,
    pub non_signer_stake_indices: Vec<Vec<u32>>,
}
```

**Design Decisions**:

- **Per `task_response_digest` aggregation**: Different operators may propose different responses (different `task_response_digest` values). Each digest has its own aggregation state, allowing multiple valid responses to be aggregated simultaneously.
- **Stake tracking per quorum**: Operators may have stake in multiple quorums. The service tracks stake per quorum to check thresholds independently.
- **Storing `non_signers_operators_ids`**: The `BlsAggregationServiceResponse` stores non-signer operator IDs computed during aggregation. This enables efficient index updates via `update_response_indices()` without re-fetching operator state or performing O(n²) public key matching. The operator IDs are already computed in `build_aggregated_response()` and preserving them eliminates a 50-200ms RPC call when recalculating indices for a new `taskCreatedBlock`.

#### Key Methods

##### `start()`

Initializes the BLS service and spawns main loop:

```rust
pub fn start(self) -> (ServiceHandle, AggregateReceiver)
```

**Responsibilities**:

- Creates message channels for task initialization and signature processing
- Creates aggregate response channel
- Spawns main `run()` loop in background task
- Returns `ServiceHandle` (for sending messages) and `AggregateReceiver` (for receiving responses)

##### `run()`

Main message processing loop:

```rust
async fn run(
    self,
    mut msg_receiver: UnboundedReceiver<AggregationMessage>,
    aggregate_sender: UnboundedSender<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
)
```

**Message Types**:

- `InitializeTask`: Creates new task aggregator, spawns `single_task_aggregator` task
- `ProcessSignature`: Forwards signature to appropriate task aggregator via per-task channel

**Memory Management**:

- Maintains `task_channels` HashMap with FIFO eviction (`MAX_ACTIVE_TASKS = 10000`)
- Tracks insertion order for eviction
- Detects finished tasks via channel closure

**Error Isolation**: Each task runs in its own spawned task. Panics are caught and logged without affecting other tasks.

##### `single_task_aggregator()`

Per-task aggregation logic:

```rust
async fn single_task_aggregator(
    avs_registry_service: A,
    metadata: TaskMetadata,
    aggregated_response_sender: UnboundedSender<...>,
    signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
```

**Flow**:

1. Fetches operator AVS state at task creation block
2. Fetches quorum AVS state (total stakes, aggregate public keys)
3. Enters `loop_task_aggregator()` to process signatures
4. Handles task expiry timer
5. Handles window duration for additional signatures after quorum

**Isolation**: Each task runs in isolated spawned task. Errors don't propagate to other tasks.

##### `loop_task_aggregator()`

Main signature processing loop for a task:

```rust
async fn loop_task_aggregator(
    avs_registry_service: A,
    task_id: TaskId,
    task_created_block: u64,
    time_to_expiry: Duration,
    aggregated_response_sender: UnboundedSender<...>,
    mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
    operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
    total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
    quorum_threshold_percentage_map: HashMap<u8, u8>,
    quorum_apks_g1: Vec<BlsG1Point>,
    quorum_nums: Vec<u8>,
    window_duration: Duration,
) -> Result<(), BlsAggregationServiceError>
```

**Flow**:

1. Initializes `aggregated_operators` HashMap (keyed by `task_response_digest`)
2. Selects between signature channel and task expiry timer
3. For each signature:
   - Calls `handle_new_signature()` to process
   - Updates aggregation state
   - Checks quorum thresholds
   - If thresholds met: aggregates and sends response, opens window
4. Handles window duration for additional signatures
5. Cleans up on task expiry

**Memory Management**:

- `aggregated_operators` limited to `MAX_AGGREGATED_OPERATORS_PER_TASK = 100` per task
- If limit reached, new digests are ignored (logged as warning) - this scenario should never happen in practice
- Cleared when task completes

##### `handle_new_signature()`

Processes a new signature:

```rust
async fn handle_new_signature(
    avs_registry_service: &A,
    aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
    open_window: &mut bool,
    current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
    window_tx: &UnboundedSender<bool>,
    task_id: TaskId,
    task_created_block: u64,
    operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
    total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
    quorum_threshold_percentage_map: &HashMap<u8, u8>,
    quorum_apks_g1: &[BlsG1Point],
    quorum_nums: &[u8],
    window_duration: Duration,
    signed_task_digest: Option<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
```

**Flow**:

1. Validates inputs (operator_id, task_response_digest, quorum_nums)
2. Checks for duplicate signatures (same operator signing same digest)
3. Verifies signature cryptographically
4. Sends verification result to result channel (handles receiver drop gracefully)
5. If valid: Updates `aggregated_operators` for the `task_response_digest`
6. Checks if quorum thresholds are met
7. If met: Aggregates and sends response, opens window

**Error Handling**:

- Invalid signatures: Logged and returned via result channel
- Duplicate signatures: Detected and rejected
- Missing operator state: Returns `RegistryError`
- Receiver drop (timeout/cancellation): Handled gracefully, doesn't propagate error

##### `update_aggregated_operators()`

Updates aggregation state for a `task_response_digest`:

```rust
fn update_aggregated_operators(
    task_id: TaskId,
    aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
    operator_state: &OperatorAvsState,
    task_response_digest: FixedBytes<32>,
    bls_signature: Signature,
    operator_id: FixedBytes<32>,
) -> Result<AggregatedOperators, BlsAggregationServiceError>
```

**Flow**:

1. If digest already exists: Calls `aggregate_new_operator(task_id, ...)` to add operator to existing aggregation
2. If new digest: Creates new `AggregatedOperators` entry
3. Returns updated aggregation state

**Error Handling**:

- Returns `RegistryError { task_id, operator_context, reason }` if operator public keys are missing
- Includes `task_id` parameter for error context (replaces previous `.unwrap()` panics)
- Error reason includes operator ID hex for debugging

## Memory Management & Safety

### Memory Limits & Eviction

The aggregator implements multiple layers of memory protection to prevent unbounded growth:

#### Task Responses (`task_responses`)

- **Limit**: `MAX_TASK_RESPONSES = 10000` tasks
- **Eviction Strategy**: FIFO (First-In-First-Out) using insertion order tracking
- **Implementation**:
  - `task_responses_insertion_order`: `RwLock<HashMap<TaskId, u64>>` tracks insertion order
  - `task_responses_insertion_counter`: `AtomicU64` for lock-free counter increments
  - When limit reached: Finds oldest entry (read lock, non-blocking), evicts it (write lock, minimal duration)
- **Cleanup**:
  - Automatic cleanup after successful submission
  - Periodic cleanup every 60 seconds (`TASK_RESPONSES_CLEANUP_INTERVAL`)
  - Evicts 10% extra entries when over limit for headroom

#### Pending Signatures (`pending_signatures`)

- **Limit**: `MAX_PENDING_SIGNATURE_TASKS = 1000` tasks
- **Timeout**: `PENDING_SIGNATURE_TIMEOUT = 5 seconds`
- **Eviction Strategy**: Time-based expiration + size limit
- **Implementation**:
  - Each entry has `created_at: Instant` timestamp
  - Periodic cleanup every 10 seconds (`PENDING_SIGNATURE_CLEANUP_INTERVAL`)
  - Removes entries older than timeout
  - Rejects new tasks if at limit (prevents DoS)
- **Use Case**: Handles signatures that arrive before task initialization

#### BLS Service Limits

- **Active Tasks**: `MAX_ACTIVE_TASKS = 10000` concurrent tasks
  - FIFO eviction when limit reached
  - Tracks insertion order for eviction
- **Aggregated Operators Per Task**: `MAX_AGGREGATED_OPERATORS_PER_TASK = 100` response digests
  - Prevents memory bloat for tasks with many different responses
  - FIFO eviction with insertion order tracking

### Lock-Free & Non-Blocking Patterns

#### AtomicU64 Counter

The insertion counter uses `AtomicU64` for lock-free increments:

```rust
task_responses_insertion_counter: Arc<AtomicU64>

// Increment (lock-free)
let counter_value = task_responses_insertion_counter.fetch_add(1, Ordering::Relaxed) + 1;
```

**Benefits**: No blocking, no contention, constant-time operation.

#### RwLock for Insertion Order

The insertion order map uses `RwLock` to allow concurrent reads:

```rust
task_responses_insertion_order: Arc<RwLock<HashMap<TaskId, u64>>>

// Read (non-blocking, allows concurrent readers)
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);

// Write (exclusive, but only when modifying)
let mut insertion_order_write = task_responses_insertion_order.write().await;
insertion_order_write.insert(task_id, counter_value);
```

**Benefits**:

- Multiple readers can check insertion order concurrently
- Writers only block other writers, not readers
- Minimizes lock contention

#### Lock Minimization

ServiceHandle is cloned before async operations to avoid holding locks:

```rust
// Clone handle before async call to avoid holding lock during async operation
let handle = {
    let locked_handle = self.service_handle.lock().await;
    locked_handle.clone()
};
let result = handle.process_signature(task_signature).await;
```

**Benefits**: Lock is released immediately after cloning, allowing other operations to proceed during async call.

### Resource Cleanup

#### Automatic Cleanup

- **After successful submission**: `task_responses` entry removed immediately
- **On task expiry**: BLS service cleans up task state, channels closed
- **On channel closure**: Detected in `ProcessSignature` handling, triggers cleanup

#### Timeout-Based Expiration

- **Pending signatures**: Auto-removed after 5 seconds if task never initializes
- **Task expiry**: Handled by timer in `single_task_aggregator`

#### Graceful Shutdown

- **CancellationToken**: All background tasks check cancellation token
- **Drop implementation**: `AggregatorCore::Drop` cancels background tasks
- **Channel closure**: Detected gracefully, doesn't panic

## Concurrency & Async Patterns

### Task Isolation

Each aggregation task runs in its own spawned task with dedicated channels, ensuring complete isolation:

```rust
// In BLS service run() loop
tokio::spawn(async move {
    let result = Self::single_task_aggregator(
        avs_registry_service,
        metadata,
        task_response_sender,  // Task-specific response sender
        signature_rx,
    ).await;
    // Handle result, log errors
    // Response sender dropped here, channel closes naturally
});

// Monitor task for panic detection
tokio::spawn(async move {
    if let Err(e) = join_handle.await {
        error!("Task aggregator panicked: {:?}", e);
    }
});
```

**Benefits**:

- **Complete isolation**: Errors in one task don't affect others
- **Channel isolation**: Each task has its own response channel, no cross-task interference
- **Panic safety**: Panics are caught and logged, don't crash the service
- **Independent cancellation**: Tasks can be cancelled independently via channel closure
- **Resource cleanup**: Channel closure automatically triggers cleanup

### Channel-Based Communication

The system uses unbounded channels for message passing with per-task isolation:

```rust
// Task initialization - creates TWO channels per task:
// 1. Signature channel (for sending signatures to task aggregator)
// 2. Response channel (for receiving aggregated responses)
let (signature_tx, signature_rx) = mpsc::unbounded_channel::<SignedTaskResponseDigest>();
let (response_tx, response_rx) = mpsc::unbounded_channel::<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>();

// Store both channels
task_channels.insert(task_id, (signature_tx, response_tx, timestamp));

// Return response receiver to caller (AggregatorCore)
result_sender.send(Ok(response_rx))?;

// Result channel (oneshot for verification results)
let (result_tx, result_rx) = oneshot::channel();
```

**Benefits**:

- **Per-task isolation**: Each task has dedicated channels, no interference
- **Zero contention**: No shared mutex for response waiting
- **Direct routing**: Responses go directly to the correct task
- **Non-blocking sends**: Unbounded channels allow immediate sends
- **Natural cancellation**: Dropping receiver cancels wait operation
- **Clean separation**: Signature processing and response delivery are decoupled

### Lock Contention Minimization

Multiple strategies minimize lock contention:

1. **Per-task channels**: Eliminates mutex contention for response waiting (biggest win)
2. **Read locks for lookups**: `RwLock::read()` allows concurrent reads
3. **Write locks only when modifying**: Acquired just before modification, released immediately
4. **Batch operations**: Collect data before acquiring lock, minimize lock duration
5. **Handle cloning**: Clone before async operations to release lock early
6. **Receiver removal**: Receivers removed from HashMap at start of `wait_for_aggregation()`, ensuring only one waiter per task

**Example**:

```rust
// Read lock to find oldest (non-blocking)
let oldest_task_id_opt = {
    let insertion_order_read = self.task_responses_insertion_order.read().await;
    insertion_order_read.iter().min_by_key(|(_, order)| *order).map(|(id, _)| *id)
};

// Write lock only when evicting
if let Some(oldest_task_id) = oldest_task_id_opt {
    if task_responses_map.remove(&oldest_task_id).is_some() {
        let mut insertion_order_write = self.task_responses_insertion_order.write().await;
        insertion_order_write.remove(&oldest_task_id);
    }
}
```

## Error Handling & Robustness

### Input Validation

All public methods validate inputs before processing:

- **Task ID**: Must be non-zero (`TaskId::ZERO` check)
- **Operator ID**: Must be non-zero (all bytes checked)
- **Quorum numbers**: Must be non-empty
- **Threshold percentage**: Must be between 1 and 100
- **Timeouts**: Must be non-zero duration

**Example**:

```rust
if task_id == TaskId::ZERO {
    return Err(eyre::eyre!("Invalid task_id: zero task ID"));
}
if operator_id.as_slice().iter().all(|&b| b == 0) {
    return Err(eyre::eyre!("Invalid operator_id: zero operator ID"));
}
```

### Error Types & Context

All `BlsAggregationServiceError` variants are enriched with structured context fields for comprehensive debugging:

#### TaskExpired

```rust
BlsAggregationServiceError::TaskExpired {
    task_id: TaskId,
    reason: String,  // e.g., "task expired without reaching quorum threshold"
}
```

#### TaskNotFound

```rust
BlsAggregationServiceError::TaskNotFound {
    task_id: TaskId,
    reason: String,  // e.g., "task not found in task_channels (task may not be initialized yet)"
}
```

#### SignatureVerificationError

```rust
BlsAggregationServiceError::SignatureVerificationError {
    task_id: TaskId,
    operator_id: FixedBytes<32>,
    verification_error: SignatureVerificationError,  // DuplicateSignature, IncorrectSignature, etc.
}
```

#### SignaturesChannelClosed

```rust
BlsAggregationServiceError::SignaturesChannelClosed {
    task_id: TaskId,
    reason: String,  // e.g., "signature channel receiver dropped (task aggregator may have finished)"
}
```

#### RegistryError

```rust
BlsAggregationServiceError::RegistryError {
    task_id: TaskId,
    operator_context: String,  // e.g., " from operator 0x1234..." or empty
    reason: String,  // e.g., "failed to get operator AVS state at block 12345: ..."
}
```

#### DuplicateTaskId

```rust
BlsAggregationServiceError::DuplicateTaskId {
    task_id: TaskId,
    reason: String,  // e.g., "task already exists in task_channels (message #42)"
}
```

**Benefits of Structured Errors**:

- **Complete Context**: Every error includes `task_id` and specific `reason` for immediate debugging
- **Operator Identification**: Signature errors include `operator_id` to identify problematic operators
- **Operation Context**: Registry errors include `operator_context` when applicable
- **Detailed Reasons**: Human-readable `reason` strings explain exactly what went wrong and where
- **Structured Logging**: Errors can be logged with full context using structured logging:

```rust
match result {
    Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
        error!(
            task_id = %task_id,
            reason = %reason,
            "Task not found - buffering signature for later processing"
        );
    }
    Err(BlsAggregationServiceError::SignatureVerificationError {
        task_id,
        operator_id,
        verification_error,
    }) => {
        error!(
            task_id = %task_id,
            operator_id = %hex!(operator_id.as_slice()),
            verification_error = ?verification_error,
            "Signature verification failed"
        );
    }
    // ... other error variants
}
```

### Error Isolation

Errors are isolated at multiple levels:

1. **Signature-level**: Failed signature processing doesn't affect other signatures
2. **Task-level**: Errors in one task don't affect other tasks
3. **Service-level**: Panics in spawned tasks are caught and logged

**Example**: In `process_pending_signatures_for_task()`, failed signatures are dropped and logged, but processing continues for remaining signatures.

### Graceful Degradation

The system degrades gracefully under failure:

- **Invalid signatures**: Logged and dropped, processing continues
- **Missing operator state**: Returns error, doesn't panic
- **Channel closure**: Detected gracefully, doesn't crash
- **Memory pressure**: Evicts oldest entries, continues operating

## Performance Optimizations

### Per-Task Channel Architecture with DashMap

The most significant performance improvements come from two architectural changes:

**1. Per-Task Channels** (eliminates response stealing):

- Each task gets its own `UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>`
- Zero response stealing: Responses go directly to the correct task
- Natural cancellation via receiver drop

**2. DashMap for Task States** (eliminates lock contention):

- Lock-free concurrent access for different TaskIds
- Per-entry locking: Only conflicting operations on same TaskId block each other
- Scales linearly with number of concurrent tasks

**Performance Comparison**:

| Metric              | Before (RwLock)    | After (DashMap)  | Improvement            |
|---------------------|--------------------|------------------|------------------------|
| Latency (10k tasks) | 5-500ms            | 0.1-0.5ms        | 10-1000x faster        |
| Throughput          | Limited by mutex   | 10k+ concurrent  | Linear scaling         |
| Contention          | High (global lock) | Zero (per-entry) | Eliminated             |
| Response Stealing   | Possible           | Impossible       | Guaranteed correctness |

**Implementation**:

```rust
// In initialize_task()
self.task_response_receivers.insert(task_id, response_receiver);
self.task_states.insert(task_id, TaskState::new(quorum_nums, broadcast_count));

// In wait_for_aggregation()
let receiver = self.task_response_receivers.remove(&task_id).map(|(_, v)| v)?;
// Receive directly from task-specific channel (no mutex lock)
match receiver.recv().await { ... }

// In process_signed_response()
if let Some(mut state) = self.task_states.get_mut(&task_id) {
    state.task_responses.insert(digest, response);
}
```

**Benefits**:

- **Scalability**: Linear scaling with number of tasks (no contention)
- **Latency**: Constant-time response delivery regardless of concurrent load
- **Isolation**: One task's operations don't affect others
- **Cancellation**: Natural cancellation via receiver drop
- **Memory**: Automatic cleanup when receivers are dropped

### Batch Operations

Batch insertion minimizes lock contention:

```rust
// Collect successful responses before lock acquisition
let mut successful_responses: Vec<(TaskResponseDigest, BindingTaskResponse)> = Vec::new();

for signature in signatures_to_process {
    // Process signature...
    if result.is_ok() {
        successful_responses.push((digest, response));
    }
}

// Single lock acquisition for batch insert
if !successful_responses.is_empty() {
    let mut task_responses_map = task_responses.lock().await;
    let task_entry = task_responses_map.entry(task_id).or_default();
    for (digest, response) in successful_responses {
        task_entry.entry(digest).or_insert(response);
    }
}
```

### Non-Blocking Reads

Read locks allow concurrent access:

```rust
// Multiple readers can check insertion order concurrently
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);
```

### Early Returns

Early returns avoid unnecessary work:

```rust
// Early return if no signatures to process
if signatures_to_process.is_empty() {
    return;
}

// Early return if task already exists
if task_channels.contains_key(&task_id) {
    result_sender.send(Err(BlsAggregationServiceError::DuplicateTaskId)).ok();
    continue;
}
```

## Data Flow Examples

### Task Initialization Flow

1. **`initialize_task()` called** with task metadata
2. **Input validation**: Task ID, quorum numbers, threshold, timeout validated
3. **TaskMetadata created**: Wraps parameters in structured type
4. **ServiceHandle cloned**: Avoids holding lock during async operation
5. **InitializeTask message sent**: Via `ServiceHandle` to BLS service
6. **BLS service receives message**: In `run()` loop
7. **Task aggregator spawned**: `single_task_aggregator` task created
8. **Per-task channel created**: `signature_tx` added to `task_channels`
9. **Pending signatures notified**: `pending_notify_tx.send(task_id)` triggers retry
10. **Background loop processes**: Buffered signatures for this task are processed

### Signature Processing Flow

1. **`process_signed_response()` called** with signed response
2. **Input validation**: Task ID and operator ID validated
3. **Task response digest computed**: Keccak256 hash of encoded response
4. **TaskSignature created**: Wraps task_id, digest, signature, operator_id
5. **ServiceHandle cloned**: Released before async call
6. **`process_signature()` called**: Sends to BLS service
7. **If task not initialized**:
   - Signature buffered in `pending_signatures`
   - Size limit checked (`MAX_PENDING_SIGNATURE_TASKS`)
   - Returns success (signature will be processed later)
8. **If task initialized**:
   - Signature sent to task aggregator via per-task channel
   - BLS service verifies signature
   - If valid: Stored in `task_responses` with size limit check
   - Returns success or error

### Aggregation Flow (BLS Service)

1. **Signature received** via per-task channel in `single_task_aggregator`
2. **Signature verification**:
   - Duplicate check (same operator, same digest)
   - Cryptographic verification against operator public key
   - Result sent to result channel
3. **If valid signature**:
   - Operator state looked up
   - `update_aggregated_operators()` called for `task_response_digest`
   - Aggregation state updated (signature aggregated, stake added)
4. **Quorum check**:
   - `check_if_stake_thresholds_met()` called
   - Checks if aggregated stake meets threshold for each quorum
5. **If quorum met**:
   - Aggregated response created (`BlsAggregationServiceResponse`)
   - Response sent via `aggregated_response_sender`
   - Window opened for additional signatures (`window_duration`)
6. **Window handling**:
   - Additional signatures accepted during window
   - Final aggregated response sent when window closes
7. **Task expiry**:
   - Timer expires after `time_to_expiry`
   - Task state cleaned up
   - Channel closed (detected in `ProcessSignature` handling)

## Edge Cases & Failure Modes

### Signature Arrives Before Task Initialization

**Scenario**: Operator submits signature before `initialize_task()` is called.

**Handling**:

1. Signature buffered in `pending_signatures` HashMap
2. Entry created with `created_at` timestamp
3. When task initialized: `pending_notify_tx.send(task_id)` notifies background loop
4. Background loop processes all buffered signatures for that task
5. If task never initializes: Entry auto-removed after 5 seconds timeout

**Memory Safety**: Limited to `MAX_PENDING_SIGNATURE_TASKS = 1000` tasks. New tasks rejected if at limit.

### Memory Pressure

**Scenario**: System receives more tasks than memory limits allow.

**Handling**:

1. **Task responses**: When `MAX_TASK_RESPONSES` reached, oldest entry evicted (FIFO)
2. **Pending signatures**: When `MAX_PENDING_SIGNATURE_TASKS` reached, new tasks rejected
3. **BLS service tasks**: When `MAX_ACTIVE_TASKS` reached, oldest task evicted
4. **Aggregated operators**: When `MAX_AGGREGATED_OPERATORS_PER_TASK` reached, oldest digest evicted

**Logging**: All evictions logged with warning level, including which entry was evicted and current size.

### Task Expiry

**Scenario**: Task expires before quorum is reached.

**Handling**:

1. Timer in `single_task_aggregator` expires after `time_to_expiry`
2. Task state cleaned up
3. Channel closed (receiver dropped)
4. Subsequent `ProcessSignature` messages detect channel closure
5. Entry removed from `task_channels`
6. No panic, graceful cleanup

### Duplicate Signatures

**Scenario**: Same operator submits multiple signatures for same `task_response_digest`.

**Handling**:

1. `is_duplicate_signature()` checks if operator already in `signers_operator_ids_set`
2. If duplicate: Structured error sent to result channel:

   ```rust
   BlsAggregationServiceError::SignatureVerificationError {
       task_id,
       operator_id: signed_digest.operator_id,
       verification_error: SignatureVerificationError::DuplicateSignature,
   }
   ```

3. Signature not aggregated
4. Processing continues for other signatures
5. Error logged with full context (task_id, operator_id, verification_error type)

**Note**: Different operators can sign same digest (aggregated), but same operator cannot sign twice.

### Channel Closure (Receiver Drop)

**Scenario**: Client times out or cancels request, receiver dropped.

**Handling**:

1. `result_channel.send()` returns `Err` if receiver dropped
2. Error logged with warning (expected in timeout scenarios)
3. Error not propagated (caller already cancelled)
4. Processing continues normally

**Example**:

```rust
if signed_digest.result_channel.send(verification_result).is_err() {
    warn!("Failed to send verification result (receiver dropped - likely timeout)");
    return Ok(()); // Don't propagate error
}
```

### Missing Operator Public Keys

**Scenario**: Operator state exists but public keys are `None`.

**Handling**:

1. `update_aggregated_operators()` checks for `pub_keys` existence
2. Returns `BlsAggregationServiceError::RegistryError { task_id, operator_context, reason }` if missing
3. Error logged with full context:

   ```rust
   BlsAggregationServiceError::RegistryError {
       task_id,
       operator_context: format!(" from operator {}", hex!(operator_id.as_slice())),
       reason: "operator public keys not found in operator state".to_string(),
   }
   ```

4. Signature not aggregated, but processing continues

**Previous Issue**: Used `.unwrap()` which would panic. Now properly handled with structured error return including `task_id`, `operator_context`, and detailed `reason`.

## Configuration Constants

All configuration constants are defined in the respective modules:

### AggregatorCore (`core.rs`)

- **`MAX_PENDING_SIGNATURE_TASKS: usize = 1000`**

  - Maximum number of tasks that can have pending signatures
  - Prevents unbounded memory growth if tasks never initialize
  - New tasks rejected if at limit

- **`MAX_TASK_RESPONSES: usize = 10000`**

  - Maximum number of tasks that can have stored responses
  - Prevents unbounded memory growth
  - FIFO eviction when limit reached

- **`PENDING_SIGNATURE_TIMEOUT: Duration = Duration::from_secs(5)`**

  - Timeout for pending signatures before automatic removal
  - Prevents permanent memory leaks if tasks never initialize
  - Entries older than this are removed by cleanup task

- **`PENDING_SIGNATURE_CLEANUP_INTERVAL: Duration = Duration::from_secs(10)`**

  - Interval for checking and cleaning up expired pending signatures
  - Background task runs every 10 seconds
  - Removes entries older than `PENDING_SIGNATURE_TIMEOUT`

- **`TASK_RESPONSES_CLEANUP_INTERVAL: Duration = Duration::from_secs(60)`**
  - Interval for checking and cleaning up stale task responses
  - Background task runs every 60 seconds
  - Evicts oldest entries if over `MAX_TASK_RESPONSES` limit

### BLS Aggregation Service (`bls.rs`)

- **`MAX_ACTIVE_TASKS: usize = 10000`**

  - Maximum number of active tasks allowed in `task_channels`
  - Prevents memory leaks from unbounded task growth
  - FIFO eviction when limit reached

- **`MAX_AGGREGATED_OPERATORS_PER_TASK: usize = 100`**
  - Maximum number of different response digests per task in `aggregated_operators`
  - Prevents memory bloat for tasks with many different responses
  - If limit reached, new digests are ignored (logged as warning) - this scenario should never happen in practice

## Testing Considerations

### Memory Leak Testing

Test scenarios:

- Run aggregator for extended period (24+ hours)
- Monitor memory usage over time
- Verify cleanup tasks are running
- Check that eviction is working when limits reached

### Concurrency Testing

Test scenarios:

- Multiple tasks initialized simultaneously
- Signatures arriving concurrently for different tasks
- High signature throughput
- Verify task isolation (errors in one task don't affect others)

### Error Injection Testing

Test scenarios:

- Invalid signatures
- Missing operator state
- Channel closure
- Task expiry before quorum
- Memory pressure (limits reached)

### Performance Testing

Test scenarios:

- Throughput: Signatures processed per second
- Latency: Time from signature submission to aggregation
- Lock contention: Measure time spent waiting for locks
- Memory usage: Peak memory under load

## Code Examples

### Basic Usage

```rust
use newton_prover_aggregator::AggregatorCore;
use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;

// Initialize aggregator
let aggregator = AggregatorCore::new(
    avs_registry_reader,
    operator_registry_address,
    ws_rpc_url,
    http_rpc_url,
).await?;

// Initialize a task
aggregator.initialize_task(
    task_id,
    task_created_block,
    quorum_nums,
    quorum_threshold_percentage,
    time_to_expiry,
).await?;

// Process signed response from operator
aggregator.process_signed_response(signed_response).await?;

// Wait for aggregation (with timeout) - now requires task_id parameter
let aggregated_response = aggregator.wait_for_aggregation(task_id, timeout_duration).await?;

// Submit to contract
let receipt = aggregator.submit_aggregated_response(
    &avs_writer,
    task,
    task_response,
    aggregated_response,
).await?;
```

### Error Handling

All errors include structured context for debugging. Pattern matching on error variants provides access to detailed information:

```rust
match aggregator.process_signed_response(signed_response).await {
    Ok(()) => {
        info!("Signature processed successfully");
    }
    Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
        warn!(
            task_id = %task_id,
            reason = %reason,
            "Task not initialized yet - signature buffered for later processing"
        );
        // Signature will be processed when task is initialized
    }
    Err(BlsAggregationServiceError::SignatureVerificationError {
        task_id,
        operator_id,
        verification_error,
    }) => {
        error!(
            task_id = %task_id,
            operator_id = %hex!(operator_id.as_slice()),
            verification_error = ?verification_error,
            "Signature verification failed"
        );
    }
    Err(BlsAggregationServiceError::RegistryError {
        task_id,
        operator_context,
        reason,
    }) => {
        error!(
            task_id = %task_id,
            operator_context = %operator_context,
            reason = %reason,
            "AVS registry error"
        );
    }
    Err(e) => {
        error!("Failed to process signature: {}", e);
        // All errors include task_id and reason in their Display implementation
    }
}

match aggregator.wait_for_aggregation(task_id, timeout_duration).await {
    Ok(response) => {
        info!("Aggregation successful: {} signers", response.non_signers_pub_keys_g1.len());
    }
    Err(AggregatorCoreError::TaskNotInitialized { task_id }) => {
        warn!(task_id = %task_id, "Task not initialized - call initialize_task first");
    }
    Err(AggregatorCoreError::Timeout { duration_ms, timeout_ms, operator_errors }) => {
        warn!(
            task_id = %task_id,
            duration_ms,
            timeout_ms,
            "Aggregation timed out after {} ms",
            timeout_ms
        );
        if let Some(errors) = operator_errors {
            warn!("Operator errors: {:?}", errors);
        }
    }
    Err(AggregatorCoreError::AggregationServiceError(BlsAggregationServiceError::TaskExpired { task_id, reason })) => {
        warn!(
            task_id = %task_id,
            reason = %reason,
            "Task expired before aggregation completed"
        );
    }
    Err(AggregatorCoreError::Cancelled) => {
        warn!(task_id = %task_id, "Aggregation cancelled");
    }
    Err(e) => {
        error!(task_id = %task_id, "Aggregation failed: {}", e);
    }
}
```

**Error Context Access**:

- **Pattern Matching**: Destructure error variants to access `task_id`, `operator_id`, `reason`, etc.
- **Display Implementation**: All errors implement `Display` with formatted context (e.g., `"task 123 expired: quorum not reached"`)
- **Structured Logging**: Use error fields directly in logging macros for better observability

### Customization Points

The aggregator can be customized by:

1. **Adjusting memory limits**: Modify constants (`MAX_TASK_RESPONSES`, etc.) based on expected load
2. **Custom cleanup intervals**: Adjust `PENDING_SIGNATURE_CLEANUP_INTERVAL` and `TASK_RESPONSES_CLEANUP_INTERVAL`
3. **Error handling**: All errors are logged with context, can be extended with custom error types
4. **Metrics**: Integration points for metrics collection (see `newton_prover_metrics` usage)

## Performance & Scalability

### Throughput & Latency

The per-task channel architecture provides significant performance improvements:

**Metrics** (under high concurrency with 10k+ concurrent tasks):

- **Latency**: ~0.1-0.5ms per response (constant time, independent of concurrent load)
  - Previous approach: 5-500ms (increased with concurrent tasks due to mutex contention)
- **Throughput**: Supports 10k+ concurrent tasks efficiently
  - Previous approach: Limited by mutex serialization
- **Mutex Contention**: Zero for response waiting (per-task channels)
  - Previous approach: High contention on shared `AggregateReceiver` mutex
- **Response Stealing**: Zero (direct routing to task-specific channels)
  - Previous approach: Responses could be consumed by wrong task's waiter

**Scalability Characteristics**:

- **Linear scaling**: Performance scales linearly with number of tasks
- **Constant latency**: Response delivery time is constant regardless of concurrent load
- **No serialization bottlenecks**: Each task operates independently
- **Memory efficient**: Receivers automatically cleaned up when tasks complete

### Robustness & Resilience

**Fault Tolerance**:

- **Isolated failures**: One task's channel closure doesn't affect others
- **Natural cancellation**: Dropping receiver = cancellation, no explicit cleanup needed
- **Graceful degradation**: System continues operating even if individual tasks fail
- **Resource cleanup**: Automatic cleanup when receivers are dropped

**Error Handling**:

- **Task-specific errors**: Errors are guaranteed to be for the correct task_id (no cross-task error leakage)
- **Structured errors**: All errors include task_id, reason, and context for debugging
- **Timeout handling**: Per-task timeouts with operator error collection
- **Cancellation support**: Optional cancellation tokens for request cancellation

**Memory Management**:

- **Automatic cleanup**: Receivers removed from HashMap when `wait_for_aggregation()` starts
- **No memory leaks**: Receivers dropped when function returns
- **Bounded growth**: HashMap size limited by number of active tasks
- **Efficient storage**: Only active tasks have receivers stored

## Summary

The Newton Prover Aggregator is designed for reliability, scalability, and high performance.

- **Memory Safety**: Bounded data structures with FIFO eviction prevent memory leaks
- **Concurrency**: Per-task channels eliminate mutex contention, enabling true concurrent processing
- **Performance**: ~0.1-0.5ms latency, supports 10k+ concurrent tasks efficiently
- **Scalability**: Linear scaling with constant latency regardless of concurrent load
- **Error Isolation**: Errors in one task don't affect others, task-specific error channels
- **Robustness**: Graceful handling of edge cases, natural cancellation, automatic cleanup
- **Fault Tolerance**: Isolated failures, graceful degradation, resource cleanup
- **Observability**: Comprehensive error logging with structured context (task_id, operator_id, reason) for debugging
- **Error Enrichment**: All errors include structured fields (task_id, reason, operator_context) for immediate debugging and investigation

The architecture separates concerns cleanly: `AggregatorCore` handles high-level orchestration while `BlsAggregatorService` handles low-level cryptographic aggregation. The per-task channel architecture eliminates bottlenecks and enables true concurrent processing of multiple aggregation tasks.