Skip to main content

eigen_services_blsaggregation/
bls_agg.rs

1use super::bls_aggregation_service_error::BlsAggregationServiceError;
2use super::bls_aggregation_service_response::BlsAggregationServiceResponse;
3use alloy::primitives::{FixedBytes, Uint, U256};
4use ark_bn254::{G1Affine, G2Affine};
5use ark_ec::AffineRepr;
6use eigen_crypto_bls::{BlsG1Point, BlsG2Point, Signature};
7use eigen_crypto_bn254::utils::verify_message;
8
9use eigen_services_avsregistry::AvsRegistryService;
10use eigen_types::avs_state::OperatorAvsState;
11use eigen_types::{
12    avs::{SignatureVerificationError, TaskIndex, TaskResponseDigest},
13    operator::{QuorumThresholdPercentage, QuorumThresholdPercentages},
14};
15use std::collections::HashMap;
16use tokio::{
17    sync::{
18        mpsc::{self, UnboundedReceiver, UnboundedSender},
19        oneshot,
20    },
21    time::Duration,
22};
23use tracing::{debug, error, instrument};
24
25/// Contains the aggregated operators signers information
26#[derive(Debug, Clone)]
27pub struct AggregatedOperators {
28    signers_apk_g2: BlsG2Point,
29    signers_agg_sig_g1: Signature,
30    signers_total_stake_per_quorum: HashMap<u8, U256>,
31    pub signers_operator_ids_set: HashMap<FixedBytes<32>, bool>,
32}
33
34/// Contains the metadata required to initialize a new task.
35#[derive(Clone)]
36pub struct TaskMetadata {
37    /// Index of the task
38    task_index: TaskIndex,
39    /// Block the task was created
40    task_created_block: u64,
41    /// Quorum numbers which should respond to the task
42    quorum_numbers: Vec<u8>,
43    /// Thresholds for each quorum
44    quorum_threshold_percentages: QuorumThresholdPercentages,
45    /// Time before expiry of the task response aggregation
46    time_to_expiry: Duration,
47    // Duration of the window to wait for signatures after quorum is reached
48    window_duration: Duration,
49}
50
51impl TaskMetadata {
52    /// Creates a new instance of [`TaskMetadata`]
53    ///
54    /// # Arguments
55    ///
56    /// * `task_index` - index of the task
57    /// * `task_created_block` - block number at which the task was created
58    /// * `quorum_numbers` - quorum numbers which should respond to the task
59    /// * `quorum_threshold_percentages` - threshold percentages for each quorum
60    /// * `time_to_expiry` - time until the task expires
61    ///
62    /// Use [`with_window_duration`](Self::with_window_duration) to set the window duration.
63    /// If the window duration is not set, it will default to [`Duration::ZERO`].
64    ///
65    /// # Returns
66    ///
67    /// A new instance of [`TaskMetadata`]
68    pub fn new(
69        task_index: TaskIndex,
70        task_created_block: u64,
71        quorum_numbers: Vec<u8>,
72        quorum_threshold_percentages: QuorumThresholdPercentages,
73        time_to_expiry: Duration,
74    ) -> Self {
75        Self {
76            task_index,
77            task_created_block,
78            quorum_numbers,
79            quorum_threshold_percentages,
80            time_to_expiry,
81            window_duration: Duration::ZERO,
82        }
83    }
84
85    /// Sets the window duration for the task
86    ///
87    /// # Arguments
88    /// * `window_duration` - The duration of the window to wait for signatures after quorum is reached
89    ///
90    /// # Returns
91    ///
92    /// An instance of [`TaskMetadata`] with the window duration set
93    pub fn with_window_duration(mut self, window_duration: Duration) -> Self {
94        self.window_duration = window_duration;
95        self
96    }
97}
98
99/// Contains the information of a signed task response
100#[derive(Clone)]
101pub struct TaskSignature {
102    // Index of the task
103    task_index: TaskIndex,
104    // Digest of the task response
105    task_response_digest: TaskResponseDigest,
106    // BLS signature of the task response
107    bls_signature: Signature,
108    // Operator ID of the operator that signed the task response
109    operator_id: FixedBytes<32>,
110}
111
112impl TaskSignature {
113    /// Creates a new instance of [`TaskSignature``]
114    ///
115    /// # Arguments
116    /// * `task_index` - index of the task
117    /// * `task_response_digest` - digest of the task response
118    /// * `bls_signature` - bls signature of the task response
119    /// * `operator_id` - operator ID of the operator that signed the task response
120    ///
121    /// # Returns
122    ///
123    /// [`TaskSignature`] instance
124    pub fn new(
125        task_index: TaskIndex,
126        task_response_digest: TaskResponseDigest,
127        bls_signature: Signature,
128        operator_id: FixedBytes<32>,
129    ) -> Self {
130        Self {
131            task_index,
132            task_response_digest,
133            bls_signature,
134            operator_id,
135        }
136    }
137}
138
139/// Valid messages to interact with the BLS Aggregator Service
140pub enum AggregationMessage {
141    InitializeTask(
142        TaskMetadata,
143        oneshot::Sender<Result<(), BlsAggregationServiceError>>,
144    ),
145    ProcessSignature(
146        TaskSignature,
147        oneshot::Sender<Result<(), BlsAggregationServiceError>>,
148    ),
149}
150
151/// Handler to interact with the BLS Aggregator Service
152#[derive(Debug, Clone)]
153pub struct ServiceHandle {
154    /// Channel to send messages to the BLS Aggregator Service
155    msg_sender: UnboundedSender<AggregationMessage>,
156}
157
158impl ServiceHandle {
159    /// Sends a message to the BLS Aggregator Service to initialize a new task.
160    ///
161    /// # Arguments
162    ///
163    /// * `metadata` - The metadata of the task to initialize
164    ///
165    /// # Returns
166    ///
167    /// Returns error if the task index already exists
168    pub async fn initialize_task(
169        &self,
170        metadata: TaskMetadata,
171    ) -> Result<(), BlsAggregationServiceError> {
172        let (tx, rx) = oneshot::channel();
173        self.msg_sender
174            .send(AggregationMessage::InitializeTask(metadata, tx))
175            .map_err(|_| BlsAggregationServiceError::SenderError)?;
176
177        rx.await
178            .map_err(|_| BlsAggregationServiceError::ReceiverError)?
179    }
180
181    /// Sends a message to the BLS Aggregator Service to process a signature.
182    ///
183    /// # Arguments
184    ///
185    /// * `task_signature` - The signed task response
186    ///
187    /// # Returns error:
188    ///
189    /// * `TaskNotFound` - If the task is not found
190    /// * `ChannelError` - If there is an error while sending the task through the channel
191    /// * `SignatureVerificationError` - If the signature verification fails
192    pub async fn process_signature(
193        &self,
194        task_signature: TaskSignature,
195    ) -> Result<(), BlsAggregationServiceError> {
196        let (tx, rx) = oneshot::channel();
197        self.msg_sender
198            .send(AggregationMessage::ProcessSignature(task_signature, tx))
199            .map_err(|_| BlsAggregationServiceError::SenderError)?;
200
201        rx.await
202            .map_err(|_| BlsAggregationServiceError::ReceiverError)?
203    }
204}
205
206/// Receiver to receive the aggregated responses from the BLS Aggregator Service.
207#[derive(Debug)]
208pub struct AggregateReceiver {
209    /// Channel to receive the aggregated responses from the BLS Aggregator Service
210    aggregate_receiver:
211        UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
212}
213
214impl AggregateReceiver {
215    /// Receives the aggregated response from the BLS Aggregator Service.
216    ///
217    /// # Returns
218    ///
219    /// Returns the aggregated response or an error if the channel is closed.
220    pub async fn receive_aggregated_response(
221        &mut self,
222    ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
223        self.aggregate_receiver
224            .recv()
225            .await
226            .ok_or(BlsAggregationServiceError::ReceiverError)?
227    }
228}
229
230/// The BLS Aggregator Service main struct
231#[derive(Debug)]
232pub struct BlsAggregatorService<A: AvsRegistryService>
233where
234    A: Clone,
235{
236    avs_registry_service: A,
237}
238
239/// Represents a signed task response digest
240#[derive(Debug)]
241struct SignedTaskResponseDigest {
242    task_response_digest: TaskResponseDigest,
243
244    bls_signature: Signature,
245
246    operator_id: FixedBytes<32>,
247
248    result_channel: oneshot::Sender<Result<(), BlsAggregationServiceError>>,
249}
250
251impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService<A> {
252    /// Creates a new instance of the BlsAggregatorService with the given AVS registry service
253    ///
254    /// Creates a tokio unbounded_channel to send and received aggregated responses.
255    ///
256    /// # Arguments
257    ///
258    /// * `avs_registry_service` - The AVS registry service
259    pub fn new(avs_registry_service: A) -> Self {
260        Self {
261            avs_registry_service,
262        }
263    }
264
265    /// Starts the BLS Aggregator Service running the main loop in background.
266    ///
267    /// # Returns
268    ///
269    /// Returns a tuple with the [`ServiceHandle`] and [`AggregateReceiver`] to interact with the service
270    pub fn start(self) -> (ServiceHandle, AggregateReceiver) {
271        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
272        let (agg_tx, agg_rx) = mpsc::unbounded_channel();
273
274        tokio::spawn(async move {
275            self.run(msg_rx, agg_tx).await;
276        });
277
278        // Create service handler and aggregate receiver to user can interact with the service
279        let service_handler = ServiceHandle { msg_sender: msg_tx };
280        let aggregate_receiver = AggregateReceiver {
281            aggregate_receiver: agg_rx,
282        };
283
284        (service_handler, aggregate_receiver)
285    }
286
287    /// Runs the main loop of the BLS Aggregator Service.
288    ///
289    /// This function continuously processes messages from `msg_receiver` and handles:
290    /// * [`InitializeTask`]: Initializes a new aggregation task
291    /// * [`ProcessSignature`]: Forwards a signature to the appropriate task aggregator and relays the verification result.
292    ///
293    /// The final aggregated response is sent through the `aggregate_sender` channel. In addition, each
294    /// message (both [`InitializeTask`] and [`ProcessSignature`]) uses its own channel to return specific errors or results.
295    ///
296    /// # Arguments
297    ///
298    /// * `msg_receiver` - The receiver channel to receive the valid messages
299    /// * `aggregate_sender` - The sender channel to send the aggregated responses
300    async fn run(
301        self,
302        mut msg_receiver: UnboundedReceiver<AggregationMessage>,
303        aggregate_sender: UnboundedSender<
304            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
305        >,
306    ) {
307        let mut task_channels: HashMap<TaskIndex, UnboundedSender<SignedTaskResponseDigest>> =
308            HashMap::new();
309
310        while let Some(message) = msg_receiver.recv().await {
311            match message {
312                AggregationMessage::InitializeTask(metadata, result_sender) => {
313                    let task_index = metadata.task_index;
314                    if task_channels.contains_key(&task_index) {
315                        // Task already exists - return error
316                        result_sender
317                            .send(Err(BlsAggregationServiceError::DuplicateTaskIndex))
318                            .ok();
319                        continue;
320                    }
321
322                    // Create a new channel to receive the signed task responses
323                    let (signature_tx, signature_rx) =
324                        mpsc::unbounded_channel::<SignedTaskResponseDigest>();
325                    task_channels.insert(task_index, signature_tx);
326
327                    let avs_registry_service = self.avs_registry_service.clone();
328                    let aggregated_response_sender = aggregate_sender.clone();
329
330                    tokio::spawn(async move {
331                        let _ = Self::single_task_aggregator(
332                            avs_registry_service,
333                            metadata,
334                            aggregated_response_sender,
335                            signature_rx,
336                        )
337                        .await
338                        .inspect_err(|err| {
339                            println!("Error with single_task_aggregator: {:?}", err);
340                        });
341                    });
342
343                    let _ = result_sender.send(Ok(()));
344                }
345                AggregationMessage::ProcessSignature(task_signature, result_sender) => {
346                    if let Some(sig_sender) = task_channels.get_mut(&task_signature.task_index) {
347                        // Send the signed task response to the task aggregator
348                        let signed_digest = SignedTaskResponseDigest {
349                            task_response_digest: task_signature.task_response_digest,
350                            bls_signature: task_signature.bls_signature,
351                            operator_id: task_signature.operator_id,
352                            result_channel: result_sender,
353                        };
354
355                        if let Err(send_error) = sig_sender.send(signed_digest) {
356                            let _ = send_error
357                                .0
358                                .result_channel
359                                .send(Err(BlsAggregationServiceError::SenderError));
360                        }
361                    } else {
362                        result_sender
363                            .send(Err(BlsAggregationServiceError::TaskNotFound))
364                            .ok();
365                    }
366                }
367            }
368        }
369    }
370
371    /// Processes each signed task responses given a task_index for a single task.
372    ///
373    /// It reads the signed task responses from the receiver channel and aggregates them.
374    /// * If the quorum threshold is met, it sends the aggregated response to the aggregated response sender.
375    /// * If the time to expiry is reached, it sends a task expired error to the aggregated response sender.
376    /// * If the signature is incorrect, it sends an incorrect signature error to error channel.
377    ///
378    /// # Arguments
379    ///
380    /// * `metadata` - task metadata
381    /// * `aggregated_response_sender` - The sender channel for the aggregated responses
382    /// * `signatures_rx` - The receiver channel for the signed task responses
383    async fn single_task_aggregator(
384        avs_registry_service: A,
385        metadata: TaskMetadata,
386        aggregated_response_sender: UnboundedSender<
387            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
388        >,
389        signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
390    ) -> Result<(), BlsAggregationServiceError> {
391        let quorum_threshold_percentage_map: HashMap<u8, u8> = metadata
392            .quorum_numbers
393            .iter()
394            .enumerate()
395            .map(|(i, quorum_number)| (*quorum_number, metadata.quorum_threshold_percentages[i]))
396            .collect();
397        let operator_state_avs = avs_registry_service
398            .get_operators_avs_state_at_block(metadata.task_created_block, &metadata.quorum_numbers)
399            .await
400            .map_err(|_| BlsAggregationServiceError::RegistryError)?;
401        let quorums_avs_state = avs_registry_service
402            .get_quorums_avs_state_at_block(&metadata.quorum_numbers, metadata.task_created_block)
403            .await
404            .map_err(|_| BlsAggregationServiceError::RegistryError)?;
405        let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state
406            .iter()
407            .map(|(k, v)| (*k, v.total_stake))
408            .collect();
409
410        let quorum_apks_g1: Vec<BlsG1Point> = metadata
411            .quorum_numbers
412            .iter()
413            .filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
414            .map(|avs_state| avs_state.agg_pub_key_g1.clone())
415            .collect();
416
417        Self::loop_task_aggregator(
418            avs_registry_service,
419            metadata.task_index,
420            metadata.task_created_block,
421            metadata.time_to_expiry,
422            aggregated_response_sender,
423            signatures_rx,
424            operator_state_avs,
425            total_stake_per_quorum,
426            quorum_threshold_percentage_map,
427            quorum_apks_g1,
428            metadata.quorum_numbers,
429            metadata.window_duration,
430        )
431        .await
432    }
433
434    #[allow(clippy::too_many_arguments)]
435    async fn loop_task_aggregator(
436        avs_registry_service: A,
437        task_index: TaskIndex,
438        task_created_block: u64,
439        time_to_expiry: Duration,
440        aggregated_response_sender: UnboundedSender<
441            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
442        >,
443        mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
444        operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
445        total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
446        quorum_threshold_percentage_map: HashMap<u8, u8>,
447        quorum_apks_g1: Vec<BlsG1Point>,
448        quorum_nums: Vec<u8>,
449        window_duration: Duration,
450    ) -> Result<(), BlsAggregationServiceError> {
451        let mut aggregated_operators: HashMap<FixedBytes<32>, AggregatedOperators> = HashMap::new();
452        let mut open_window = false;
453        let mut current_aggregated_response: Option<BlsAggregationServiceResponse> = None;
454        let (window_tx, mut window_rx) = tokio::sync::mpsc::unbounded_channel::<bool>();
455        let task_expired_timer = tokio::time::sleep(time_to_expiry);
456        tokio::pin!(task_expired_timer);
457
458        loop {
459            tokio::select! {
460                _ = &mut task_expired_timer => {
461                    // If the task is expired, send the aggregated response
462                    Self::handle_task_expired(
463                        &aggregated_response_sender,
464                        task_index,
465                        open_window,
466                        &current_aggregated_response,
467                    )?;
468                    return Ok(());
469                },
470                _ = window_rx.recv() => {
471                    // If the window is finished, send the aggregated response
472                    Self::handle_window_finished(
473                        &aggregated_response_sender,
474                        task_index,
475                        &current_aggregated_response,
476                    )?;
477                    return Ok(());
478                },
479                signed_task_digest = signatures_rx.recv() => {
480                    // If a new signature is received, handle it
481                    Self::handle_new_signature(
482                        &avs_registry_service,
483                        &mut aggregated_operators,
484                        &mut open_window,
485                        &mut current_aggregated_response,
486                        &window_tx,
487                        task_index,
488                        task_created_block,
489                        &operator_state_avs,
490                        &total_stake_per_quorum,
491                        &quorum_threshold_percentage_map,
492                        &quorum_apks_g1,
493                        &quorum_nums,
494                        window_duration,
495                        signed_task_digest,
496                    ).await?;
497                }
498            }
499        }
500    }
501
502    /// Handles a new signature in the [`loop_task_aggregator`] function.
503    ///
504    /// # Arguments
505    ///
506    /// * `avs_registry_service` - The avs registry service.
507    /// * `aggregated_operators` - The aggregated operators.
508    /// * `open_window` - Whether the window is open.
509    /// * `current_aggregated_response` - The current aggregated response.
510    /// * `window_tx` - The window tx.
511    /// * `task_index` - The task index.
512    /// * `task_created_block` - The task created block.
513    /// * `operator_state_avs` - The operator state avs.
514    /// * `total_stake_per_quorum` - The total stake per quorum.
515    /// * `quorum_threshold_percentage_map` - The quorum threshold percentage map.
516    /// * `quorum_apks_g1` - The quorum apks g1.
517    /// * `quorum_nums` - The quorum numbers.
518    /// * `window_duration` - The window duration.
519    /// * `signed_task_digest` - The signed task digest.
520    #[allow(clippy::too_many_arguments)]
521    #[instrument(skip_all)]
522    async fn handle_new_signature(
523        avs_registry_service: &A,
524        aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
525        open_window: &mut bool,
526        current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
527        window_tx: &UnboundedSender<bool>,
528        task_index: TaskIndex,
529        task_created_block: u64,
530        operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
531        total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
532        quorum_threshold_percentage_map: &HashMap<u8, u8>,
533        quorum_apks_g1: &[BlsG1Point],
534        quorum_nums: &[u8],
535        window_duration: Duration,
536        signed_task_digest: Option<SignedTaskResponseDigest>,
537    ) -> Result<(), BlsAggregationServiceError> {
538        debug!("New signature received for task index: {task_index}");
539
540        let signed_digest =
541            signed_task_digest.ok_or(BlsAggregationServiceError::SignaturesChannelClosed)?;
542
543        // Verify if the operator has already signed for this digest
544        if Self::is_duplicate_signature(aggregated_operators, &signed_digest) {
545            signed_digest
546                .result_channel
547                .send(Err(BlsAggregationServiceError::SignatureVerificationError(
548                    SignatureVerificationError::DuplicateSignature,
549                )))
550                .map_err(|_| BlsAggregationServiceError::SenderError)?;
551            return Ok(());
552        }
553
554        // Verify the signature
555        let verification_result = verify_signature(task_index, &signed_digest, operator_state_avs)
556            .await
557            .map_err(BlsAggregationServiceError::SignatureVerificationError);
558
559        let verification_has_error = verification_result.is_err();
560
561        // Send the verification result to the result channel
562        signed_digest
563            .result_channel
564            .send(verification_result)
565            .map_err(|_| BlsAggregationServiceError::SenderError)?;
566
567        // If the signature is incorrect, return
568        if verification_has_error {
569            return Ok(());
570        }
571
572        let operator_state = operator_state_avs.get(&signed_digest.operator_id).unwrap();
573
574        // Update the aggregated operators with the new operator info
575        let updated_aggregated = update_aggregated_operators(
576            aggregated_operators,
577            operator_state,
578            signed_digest.task_response_digest,
579            signed_digest.bls_signature,
580            signed_digest.operator_id,
581        );
582        aggregated_operators.insert(
583            signed_digest.task_response_digest,
584            updated_aggregated.clone(),
585        );
586
587        // Check if the stake thresholds are met. If not, return
588        if !Self::check_if_stake_thresholds_met(
589            &updated_aggregated.signers_total_stake_per_quorum,
590            total_stake_per_quorum,
591            quorum_threshold_percentage_map,
592        ) {
593            return Ok(());
594        }
595
596        debug!("Signature threshold is met for task index: {task_index}");
597
598        // If the window is not open, open it
599        if !*open_window {
600            *open_window = true;
601            Self::start_window(window_tx, window_duration, task_index);
602        }
603
604        *current_aggregated_response = Some(
605            Self::build_aggregated_response(
606                task_index,
607                task_created_block,
608                signed_digest.task_response_digest,
609                operator_state_avs,
610                updated_aggregated,
611                avs_registry_service,
612                quorum_apks_g1,
613                quorum_nums,
614            )
615            .await?,
616        );
617
618        Ok(())
619    }
620
621    /// Handles when the task expired in the [`loop_task_aggregator`] function.
622    /// If the window is open, send the aggregated response. Else, send the error.
623    ///
624    /// # Arguments
625    ///
626    /// * `aggregated_response_sender` - The aggregated response sender.
627    /// * `task_index` - The task index.
628    /// * `open_window` - Whether the window is open.
629    /// * `current_aggregated_response` - The current aggregated response.
630    #[instrument(skip_all)]
631    fn handle_task_expired(
632        aggregated_response_sender: &UnboundedSender<
633            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
634        >,
635        task_index: TaskIndex,
636        open_window: bool,
637        current_aggregated_response: &Option<BlsAggregationServiceResponse>,
638    ) -> Result<(), BlsAggregationServiceError> {
639        if open_window {
640            debug!("task_expired_timer while in the waiting window for task index: {task_index}",);
641            aggregated_response_sender
642                .send(Ok(current_aggregated_response.clone().unwrap()))
643                .map_err(|_| BlsAggregationServiceError::SenderError)?;
644        } else {
645            debug!("task_expired_timer NOT in the waiting window for task index: {task_index}",);
646
647            let _ = aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired));
648        }
649        Ok(())
650    }
651
652    /// Handles when the window is finished in the [`loop_task_aggregator`] function.
653    ///
654    ///
655    /// # Arguments
656    ///
657    /// * `aggregated_response_sender` - The aggregated response sender.
658    /// * `task_index` - The task index.
659    /// * `current_aggregated_response` - The current aggregated response.
660    #[instrument(skip_all)]
661    fn handle_window_finished(
662        aggregated_response_sender: &UnboundedSender<
663            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
664        >,
665        task_index: TaskIndex,
666        current_aggregated_response: &Option<BlsAggregationServiceResponse>,
667    ) -> Result<(), BlsAggregationServiceError> {
668        debug!("Window finished. Send aggregated response for task index: {task_index}");
669
670        aggregated_response_sender
671            .send(Ok(current_aggregated_response.clone().unwrap()))
672            .map_err(|_| BlsAggregationServiceError::SenderError)?;
673        Ok(())
674    }
675
676    /// Builds the aggregated response containing all the aggregation info.
677    ///
678    /// # Arguments
679    ///
680    /// * `task_index` - The index of the task.
681    /// * `task_created_block` - The block in which the task was created.
682    /// * `signed_task_digest` - The signed task.
683    /// * `operator_state_avs` - A hashmap with the operator state per operator id.
684    /// * `digest_aggregated_operators` - The aggregated operators.
685    /// * `avs_registry_service` - The avs registry service.
686    /// * `quorum_apks_g1` - The quorum aggregated public keys.
687    /// * `quorum_nums` - The quorum numbers.
688    ///
689    /// # Returns
690    ///
691    /// The BLS aggregation service response.
692    #[allow(clippy::too_many_arguments)]
693    #[instrument(skip_all)]
694    async fn build_aggregated_response(
695        task_index: TaskIndex,
696        task_created_block: u64,
697        task_response_digest: FixedBytes<32>,
698        operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
699        digest_aggregated_operators: AggregatedOperators,
700        avs_registry_service: &A,
701        quorum_apks_g1: &[BlsG1Point],
702        quorum_nums: &[u8],
703    ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
704        debug!("Build aggregated response for task index: {task_index}");
705
706        let mut non_signers_operators_ids: Vec<FixedBytes<32>> = operator_state_avs
707            .keys()
708            .filter(|operator_id| {
709                !digest_aggregated_operators
710                    .signers_operator_ids_set
711                    .contains_key(*operator_id)
712            })
713            .cloned()
714            .collect();
715
716        non_signers_operators_ids.sort();
717
718        let non_signers_pub_keys_g1: Vec<BlsG1Point> = non_signers_operators_ids
719            .iter()
720            .filter_map(|operator_id| operator_state_avs.get(operator_id))
721            .filter_map(|operator_avs_state| operator_avs_state.operator_info.pub_keys.clone())
722            .map(|pub_keys| pub_keys.g1_pub_key)
723            .collect();
724
725        let indices = avs_registry_service
726            .get_check_signatures_indices(
727                task_created_block,
728                quorum_nums.into(),
729                non_signers_operators_ids,
730            )
731            .await
732            .map_err(|_err| BlsAggregationServiceError::RegistryError)?;
733
734        Ok(BlsAggregationServiceResponse {
735            task_index,
736            task_response_digest,
737            non_signers_pub_keys_g1,
738            quorum_apks_g1: quorum_apks_g1.into(),
739            signers_apk_g2: digest_aggregated_operators.signers_apk_g2,
740            signers_agg_sig_g1: digest_aggregated_operators.signers_agg_sig_g1,
741            non_signer_quorum_bitmap_indices: indices.clone().nonSignerQuorumBitmapIndices,
742            quorum_apk_indices: indices.quorumApkIndices,
743            total_stake_indices: indices.totalStakeIndices,
744            non_signer_stake_indices: indices.nonSignerStakeIndices,
745        })
746    }
747
748    /// Checks if the stake thresholds are met for the given set of quorum members.
749    ///
750    /// # Arguments
751    ///
752    /// * `signed_stake_per_quorum` - The signed stake per quorum.
753    /// * `total_stake_per_quorum` - The total stake per quorum.
754    /// * `quorum_threshold_percentages_map` - The quorum threshold percentages map,
755    ///   containing the quorum id as a key and its corresponding quorum threshold percentage.
756    ///
757    /// # Returns
758    ///
759    /// Returns `true` if the stake thresholds are met for all the members, otherwise `false`.
760    fn check_if_stake_thresholds_met(
761        signed_stake_per_quorum: &HashMap<u8, U256>,
762        total_stake_per_quorum: &HashMap<u8, U256>,
763        quorum_threshold_percentages_map: &HashMap<u8, QuorumThresholdPercentage>,
764    ) -> bool {
765        for (quorum_num, quorum_threshold_percentage) in quorum_threshold_percentages_map {
766            let (Some(signed_stake_by_quorum), Some(total_stake_by_quorum)) = (
767                signed_stake_per_quorum.get(quorum_num),
768                total_stake_per_quorum.get(quorum_num),
769            ) else {
770                return false;
771            };
772
773            let signed_stake = signed_stake_by_quorum * U256::from(100);
774            let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage);
775
776            if signed_stake < threshold_stake {
777                return false;
778            }
779        }
780        true
781    }
782
783    /// Checks if the signature is a duplicate.
784    ///
785    /// # Arguments
786    ///
787    /// * `aggregated_operators` - The aggregated operators.
788    /// * `signed_digest` - The signed task response digest.
789    ///
790    /// # Returns
791    ///
792    /// Returns `true` if the signature is a duplicate, otherwise `false`.
793    fn is_duplicate_signature(
794        aggregated_operators: &HashMap<FixedBytes<32>, AggregatedOperators>,
795        signed_digest: &SignedTaskResponseDigest,
796    ) -> bool {
797        aggregated_operators
798            .get(&signed_digest.task_response_digest)
799            .map(|ops| {
800                ops.signers_operator_ids_set
801                    .contains_key(&signed_digest.operator_id)
802            })
803            .unwrap_or(false)
804    }
805
806    /// Starts the window to wait for new signatures.
807    ///
808    /// # Arguments
809    ///
810    /// * `window_tx` - The unbounded sender to send the window signal.
811    /// * `window_duration` - The duration of the window.
812    /// * `task_index` - The task index.
813    #[instrument(skip_all)]
814    fn start_window(
815        window_tx: &UnboundedSender<bool>,
816        window_duration: Duration,
817        task_index: TaskIndex,
818    ) {
819        let sender = window_tx.clone();
820        debug!("Create window to wait for new signatures for task index: {task_index}");
821        tokio::spawn(async move {
822            tokio::time::sleep(window_duration).await;
823            let _ = sender.send(true);
824        });
825    }
826}
827
828/// Verifies the signature of the task response given a `operator_avs_state`.
829/// If the signature is correct, it returns `Ok(())`, otherwise it returns an error.
830///
831/// # Arguments
832///
833/// * `task_index` - The index of the task
834/// * `signed_task_response_digest` - The signed task response digest
835/// * `operator_avs_state` - A hashmap containing the staked of all the operator indexed by operator_id.
836///   This is used to get the `operator_state` to obtain the operator public key.
837///
838/// # Error
839///
840/// Returns error:
841/// - `SignatureVerificationError::OperatorNotFound` if the operator is not found,
842/// - `SignatureVerificationError::OperatorPublicKeyNotFound` if the operator public key is not found,
843/// - `SignatureVerificationError::IncorrectSignature` if the signature is incorrect.
844#[instrument(skip_all)]
845async fn verify_signature(
846    task_index: TaskIndex,
847    signed_task_response_digest: &SignedTaskResponseDigest,
848    operator_avs_state: &HashMap<FixedBytes<32>, OperatorAvsState>,
849) -> Result<(), SignatureVerificationError> {
850    let Some(operator_state) = operator_avs_state.get(&signed_task_response_digest.operator_id)
851    else {
852        error!("Operator Not Found for task index: {task_index}");
853        return Err(SignatureVerificationError::OperatorNotFound);
854    };
855
856    let Some(pub_keys) = &operator_state.operator_info.pub_keys else {
857        error!("Operator Public Key Not Found for task index: {task_index}");
858        return Err(SignatureVerificationError::OperatorPublicKeyNotFound);
859    };
860
861    let message = signed_task_response_digest
862        .task_response_digest
863        .as_slice()
864        .try_into()
865        .map_err(|_| SignatureVerificationError::IncorrectSignature)?;
866
867    verify_message(
868        pub_keys.g2_pub_key.g2(),
869        message,
870        signed_task_response_digest.bls_signature.g1_point().g1(),
871    )
872    .then_some(())
873    .ok_or(SignatureVerificationError::IncorrectSignature)
874    .inspect(|_| {
875        debug!("Signature verification successful for task index: {task_index}");
876    })
877    .inspect_err(|_| {
878        error!("Signature verification failed for task index: {task_index}");
879    })
880}
881
882/// Updates the aggregated operators with the new operator info.
883///
884/// # Arguments
885///
886/// * `aggregated_operators` - The aggregated operators.
887/// * `operator_state` - The operator state.
888/// * `task_response_digest` - The task response digest.
889/// * `bls_signature` - The BLS signature.
890/// * `operator_id` - The operator id.
891///
892/// # Returns
893///
894/// The updated aggregated operators.
895fn update_aggregated_operators(
896    aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
897    operator_state: &OperatorAvsState,
898    task_response_digest: FixedBytes<32>,
899    bls_signature: Signature,
900    operator_id: FixedBytes<32>,
901) -> AggregatedOperators {
902    debug!("Update aggregated operators");
903
904    let bls_signature_g1_point = bls_signature.g1_point().g1();
905
906    if let Some(existing) = aggregated_operators.get_mut(&task_response_digest) {
907        // If the operator is already in the aggregated operators, aggregate the new operator
908        let updated = aggregate_new_operator(
909            existing,
910            operator_state.clone(),
911            operator_id,
912            bls_signature_g1_point,
913        );
914        updated.clone()
915    } else {
916        // If the operator is not in the aggregated operators, create a new aggregated operator
917        let operator_g2_pubkey = operator_state
918            .operator_info
919            .pub_keys
920            .clone()
921            .unwrap()
922            .g2_pub_key
923            .g2();
924        let mut signers_apk_g2 = BlsG2Point::new(G2Affine::zero());
925        let mut signers_agg_sig_g1 = Signature::new(G1Affine::zero());
926        for _ in 0..operator_state.stake_per_quorum.len() {
927            signers_apk_g2 = BlsG2Point::new((signers_apk_g2.g2() + operator_g2_pubkey).into());
928            signers_agg_sig_g1 = Signature::new(
929                (signers_agg_sig_g1.g1_point().g1() + bls_signature_g1_point).into(),
930            );
931        }
932        AggregatedOperators {
933            signers_apk_g2,
934            signers_agg_sig_g1,
935            signers_operator_ids_set: HashMap::from([(operator_state.operator_id, true)]),
936            signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(),
937        }
938    }
939}
940
941/// Adds a new operator to the aggregated operators by aggregating its public key, signature and stake.
942///
943/// # Arguments
944///
945/// - `aggregated_operators` - Contains the information of all the aggregated operators.
946/// - `operator_state` - The state of the operator, contains information about its stake.
947/// - `signed_task_digest` - Contains the id and signature of the new operator.
948///
949/// # Returns
950///
951/// The given aggregated operators, aggregated with the new operator info.
952#[instrument(skip_all)]
953fn aggregate_new_operator(
954    aggregated_operators: &mut AggregatedOperators,
955    operator_state: OperatorAvsState,
956    operator_id: FixedBytes<32>,
957    signature_g1_point: G1Affine,
958) -> &mut AggregatedOperators {
959    let operator_g2_pubkey = operator_state
960        .operator_info
961        .pub_keys
962        .clone()
963        .unwrap()
964        .g2_pub_key
965        .g2();
966    aggregated_operators
967        .signers_operator_ids_set
968        .insert(operator_id, true);
969
970    debug!("operator {operator_id} inserted in signers_operator_ids_set");
971
972    for (quorum_num, stake) in operator_state.stake_per_quorum.iter() {
973        // For each quorum the operator has stake in, we aggregate the signature and update the stake
974        aggregated_operators.signers_agg_sig_g1 = Signature::new(
975            (aggregated_operators.signers_agg_sig_g1.g1_point().g1() + signature_g1_point).into(),
976        );
977        aggregated_operators.signers_apk_g2 =
978            BlsG2Point::new((aggregated_operators.signers_apk_g2.g2() + operator_g2_pubkey).into());
979        aggregated_operators
980            .signers_total_stake_per_quorum
981            .entry(*quorum_num)
982            .and_modify(|v| *v += stake)
983            .or_insert(*stake);
984    }
985    aggregated_operators
986}
987
988#[cfg(test)]
989mod tests {
990    use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService};
991    use crate::bls_agg::{TaskMetadata, TaskSignature};
992    use alloy::primitives::{B256, U256};
993    use eigen_crypto_bls::{BlsG1Point, BlsG2Point, BlsKeyPair, Signature};
994
995    use eigen_services_avsregistry::fake_avs_registry_service::FakeAvsRegistryService;
996    use eigen_types::avs::SignatureVerificationError::{DuplicateSignature, IncorrectSignature};
997    use eigen_types::operator::{QuorumNum, QuorumThresholdPercentages};
998    use eigen_types::{avs::TaskIndex, test::TestOperator};
999    use sha2::{Digest, Sha256};
1000    use std::collections::HashMap;
1001    use std::time::Duration;
1002    use std::vec;
1003    use tokio::time::{sleep, Instant};
1004
1005    const PRIVATE_KEY_1: &str =
1006        "13710126902690889134622698668747132666439281256983827313388062967626731803599";
1007    const PRIVATE_KEY_2: &str =
1008        "14610126902690889134622698668747132666439281256983827313388062967626731803500";
1009    const PRIVATE_KEY_3: &str =
1010        "15610126902690889134622698668747132666439281256983827313388062967626731803501";
1011
1012    fn hash(task_response: u64) -> B256 {
1013        let mut hasher = Sha256::new();
1014        hasher.update(task_response.to_be_bytes());
1015        B256::from_slice(hasher.finalize().as_ref())
1016    }
1017
1018    fn aggregate_g1_public_keys(operators: &[TestOperator]) -> BlsG1Point {
1019        operators
1020            .iter()
1021            .map(|op| op.bls_keypair.public_key().g1())
1022            .reduce(|a, b| (a + b).into())
1023            .map(BlsG1Point::new)
1024            .unwrap()
1025    }
1026
1027    fn aggregate_g2_public_keys(operators: &[TestOperator]) -> BlsG2Point {
1028        operators
1029            .iter()
1030            .map(|op| op.bls_keypair.public_key_g2().g2())
1031            .reduce(|a, b| (a + b).into())
1032            .map(BlsG2Point::new)
1033            .unwrap()
1034    }
1035
1036    fn aggregate_g1_signatures(signatures: &[Signature]) -> Signature {
1037        let agg = signatures
1038            .iter()
1039            .map(|s| s.g1_point().g1())
1040            .reduce(|a, b| (a + b).into())
1041            .unwrap();
1042        Signature::new(agg)
1043    }
1044
1045    #[tokio::test]
1046    async fn test_1_quorum_1_operator_1_correct_signature() {
1047        let test_operator_1 = TestOperator {
1048            operator_id: U256::from(1).into(),
1049            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1050            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1051        };
1052
1053        let block_number = 1;
1054        let task_index: TaskIndex = 0;
1055        let quorum_numbers = vec![0];
1056        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1057        let time_to_expiry = Duration::from_secs(1);
1058        let task_response = 123; // Initialize with appropriate data
1059
1060        let task_response_digest = hash(task_response);
1061        let bls_signature = test_operator_1
1062            .bls_keypair
1063            .sign_message(task_response_digest.as_ref());
1064        let fake_avs_registry_service =
1065            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1066        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1067        let metadata = TaskMetadata::new(
1068            task_index,
1069            block_number,
1070            quorum_numbers,
1071            quorum_threshold_percentages,
1072            time_to_expiry,
1073        );
1074        let (handle, mut aggregator_response) = bls_agg_service.start();
1075        handle.initialize_task(metadata).await.unwrap();
1076
1077        handle
1078            .process_signature(TaskSignature::new(
1079                task_index,
1080                task_response_digest,
1081                bls_signature,
1082                test_operator_1.operator_id,
1083            ))
1084            .await
1085            .unwrap();
1086
1087        let expected_agg_service_response = BlsAggregationServiceResponse {
1088            task_index,
1089            task_response_digest,
1090            non_signers_pub_keys_g1: vec![],
1091            quorum_apks_g1: vec![test_operator_1.bls_keypair.public_key()],
1092            signers_apk_g2: test_operator_1.bls_keypair.public_key_g2(),
1093            signers_agg_sig_g1: test_operator_1
1094                .bls_keypair
1095                .sign_message(task_response_digest.as_ref()),
1096            non_signer_quorum_bitmap_indices: vec![],
1097            quorum_apk_indices: vec![],
1098            total_stake_indices: vec![],
1099            non_signer_stake_indices: vec![],
1100        };
1101
1102        let response = aggregator_response.receive_aggregated_response().await;
1103
1104        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1105        assert_eq!(task_index, response.unwrap().task_index);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_1_quorum_2_operator_2_duplicated_signatures() {
1110        let test_operator_1 = TestOperator {
1111            operator_id: U256::from(1).into(),
1112            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1113            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1114        };
1115        let test_operator_2 = TestOperator {
1116            operator_id: U256::from(2).into(),
1117            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1118            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1119        };
1120        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1121        let block_number = 1;
1122        let task_index: TaskIndex = 0;
1123        let quorum_numbers = vec![0];
1124        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1125        let time_to_expiry = Duration::from_secs(1);
1126        let task_response = 123; // Initialize with appropriate data
1127        let task_response_digest = hash(task_response);
1128
1129        let fake_avs_registry_service =
1130            FakeAvsRegistryService::new(block_number, test_operators.clone());
1131        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1132        let metadata = TaskMetadata::new(
1133            task_index,
1134            block_number,
1135            quorum_numbers,
1136            quorum_threshold_percentages,
1137            time_to_expiry,
1138        );
1139        let (handle, mut aggregator_response) = bls_agg_service.start();
1140        handle.initialize_task(metadata).await.unwrap();
1141        let bls_signature_1 = test_operator_1
1142            .bls_keypair
1143            .sign_message(task_response_digest.as_ref());
1144        handle
1145            .process_signature(TaskSignature::new(
1146                task_index,
1147                task_response_digest,
1148                bls_signature_1.clone(),
1149                test_operator_1.operator_id,
1150            ))
1151            .await
1152            .unwrap();
1153
1154        let second_signature_processing_result = handle
1155            .process_signature(TaskSignature::new(
1156                task_index,
1157                task_response_digest,
1158                bls_signature_1.clone(),
1159                test_operator_1.operator_id,
1160            ))
1161            .await;
1162
1163        assert_eq!(
1164            second_signature_processing_result,
1165            Err(BlsAggregationServiceError::SignatureVerificationError(
1166                DuplicateSignature
1167            ))
1168        );
1169
1170        let bls_signature_2 = test_operator_2
1171            .bls_keypair
1172            .sign_message(task_response_digest.as_ref());
1173
1174        handle
1175            .process_signature(TaskSignature::new(
1176                task_index,
1177                task_response_digest,
1178                bls_signature_2.clone(),
1179                test_operator_2.operator_id,
1180            ))
1181            .await
1182            .unwrap();
1183
1184        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1185        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1186        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_signature_1, bls_signature_2]);
1187        let expected_agg_service_response = BlsAggregationServiceResponse {
1188            task_index,
1189            task_response_digest,
1190            non_signers_pub_keys_g1: vec![],
1191            quorum_apks_g1: vec![quorum_apks_g1],
1192            signers_apk_g2,
1193            signers_agg_sig_g1,
1194            non_signer_quorum_bitmap_indices: vec![],
1195            quorum_apk_indices: vec![],
1196            total_stake_indices: vec![],
1197            non_signer_stake_indices: vec![],
1198        };
1199
1200        let response = aggregator_response.receive_aggregated_response().await;
1201
1202        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1203        assert_eq!(task_index, response.unwrap().task_index);
1204    }
1205
1206    #[tokio::test]
1207    async fn test_1_quorum_3_operator_3_correct_signatures() {
1208        let test_operator_1 = TestOperator {
1209            operator_id: U256::from(1).into(),
1210            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1211            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1212        };
1213        let test_operator_2 = TestOperator {
1214            operator_id: U256::from(2).into(),
1215            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1216            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1217        };
1218        let test_operator_3 = TestOperator {
1219            operator_id: U256::from(3).into(),
1220            stake_per_quorum: HashMap::from([(0u8, U256::from(300)), (1u8, U256::from(100))]),
1221            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1222        };
1223        let test_operators = vec![
1224            test_operator_1.clone(),
1225            test_operator_2.clone(),
1226            test_operator_3.clone(),
1227        ];
1228
1229        let block_number = 1;
1230        let task_index = 0;
1231        let quorum_numbers: Vec<QuorumNum> = vec![0];
1232        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
1233        let time_to_expiry = Duration::from_secs(1);
1234        let task_response = 123; // Initialize with appropriate data
1235        let task_response_digest = hash(task_response);
1236
1237        let fake_avs_registry_service =
1238            FakeAvsRegistryService::new(block_number, test_operators.clone());
1239        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1240
1241        let metadata = TaskMetadata::new(
1242            task_index,
1243            block_number,
1244            quorum_numbers,
1245            quorum_threshold_percentages,
1246            time_to_expiry,
1247        );
1248        let (handle, mut aggregator_response) = bls_agg_service.start();
1249        handle.initialize_task(metadata).await.unwrap();
1250
1251        let bls_sig_op_1 = test_operator_1
1252            .bls_keypair
1253            .sign_message(task_response_digest.as_ref());
1254        handle
1255            .process_signature(TaskSignature::new(
1256                task_index,
1257                task_response_digest,
1258                bls_sig_op_1.clone(),
1259                test_operator_1.operator_id,
1260            ))
1261            .await
1262            .unwrap();
1263
1264        let bls_sig_op_2 = test_operator_2
1265            .bls_keypair
1266            .sign_message(task_response_digest.as_ref());
1267        handle
1268            .process_signature(TaskSignature::new(
1269                task_index,
1270                task_response_digest,
1271                bls_sig_op_2.clone(),
1272                test_operator_2.operator_id,
1273            ))
1274            .await
1275            .unwrap();
1276
1277        let bls_sig_op_3 = test_operator_3
1278            .bls_keypair
1279            .sign_message(task_response_digest.as_ref());
1280        handle
1281            .process_signature(TaskSignature::new(
1282                task_index,
1283                task_response_digest,
1284                bls_sig_op_3.clone(),
1285                test_operator_3.operator_id,
1286            ))
1287            .await
1288            .unwrap();
1289
1290        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1291        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1292        let signers_agg_sig_g1 =
1293            aggregate_g1_signatures(&vec![bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
1294
1295        let expected_agg_service_response = BlsAggregationServiceResponse {
1296            task_index,
1297            task_response_digest,
1298            non_signers_pub_keys_g1: vec![],
1299            quorum_apks_g1: vec![quorum_apks_g1],
1300            signers_apk_g2,
1301            signers_agg_sig_g1,
1302            non_signer_quorum_bitmap_indices: vec![],
1303            quorum_apk_indices: vec![],
1304            total_stake_indices: vec![],
1305            non_signer_stake_indices: vec![],
1306        };
1307
1308        let response = aggregator_response.receive_aggregated_response().await;
1309        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1310        assert_eq!(task_index, response.unwrap().task_index);
1311    }
1312
1313    #[tokio::test]
1314    async fn test_2_quorum_2_operator_2_correct_signatures() {
1315        let test_operator_1 = TestOperator {
1316            operator_id: U256::from(1).into(),
1317            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1318            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1319        };
1320        let test_operator_2 = TestOperator {
1321            operator_id: U256::from(2).into(),
1322            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1323            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1324        };
1325        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1326        let block_number = 1;
1327        let task_index = 0;
1328        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1329        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1330        let time_to_expiry = Duration::from_secs(1);
1331        let task_response = 123; // Initialize with appropriate data
1332        let task_response_digest = hash(task_response);
1333
1334        let fake_avs_registry_service =
1335            FakeAvsRegistryService::new(block_number, test_operators.clone());
1336        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1337
1338        let metadata = TaskMetadata::new(
1339            task_index,
1340            block_number,
1341            quorum_numbers,
1342            quorum_threshold_percentages,
1343            time_to_expiry,
1344        );
1345        let (handle, mut aggregator_response) = bls_agg_service.start();
1346        handle.initialize_task(metadata).await.unwrap();
1347
1348        let bls_sig_op_1 = test_operator_1
1349            .bls_keypair
1350            .sign_message(task_response_digest.as_ref());
1351        handle
1352            .process_signature(TaskSignature::new(
1353                task_index,
1354                task_response_digest,
1355                bls_sig_op_1.clone(),
1356                test_operator_1.operator_id,
1357            ))
1358            .await
1359            .unwrap();
1360
1361        let bls_sig_op_2 = test_operator_2
1362            .bls_keypair
1363            .sign_message(task_response_digest.as_ref());
1364        handle
1365            .process_signature(TaskSignature::new(
1366                task_index,
1367                task_response_digest,
1368                bls_sig_op_2.clone(),
1369                test_operator_2.operator_id,
1370            ))
1371            .await
1372            .unwrap();
1373
1374        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1375        let signers_apk_g2 =
1376            aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1377        let signers_agg_sig_g1 = aggregate_g1_signatures(&[
1378            bls_sig_op_1.clone(),
1379            bls_sig_op_1,
1380            bls_sig_op_2.clone(),
1381            bls_sig_op_2,
1382        ]);
1383
1384        let expected_agg_service_response = BlsAggregationServiceResponse {
1385            task_index,
1386            task_response_digest,
1387            non_signers_pub_keys_g1: vec![],
1388            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1],
1389            signers_apk_g2,
1390            signers_agg_sig_g1,
1391            non_signer_quorum_bitmap_indices: vec![],
1392            quorum_apk_indices: vec![],
1393            total_stake_indices: vec![],
1394            non_signer_stake_indices: vec![],
1395        };
1396
1397        let response = aggregator_response.receive_aggregated_response().await;
1398
1399        assert_eq!(expected_agg_service_response, response.unwrap());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_2_concurrent_tasks_2_quorum_2_operator_2_correct_signatures() {
1404        let test_operator_1 = TestOperator {
1405            operator_id: U256::from(1).into(),
1406            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1407            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1408        };
1409        let test_operator_2 = TestOperator {
1410            operator_id: U256::from(2).into(),
1411            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1412            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1413        };
1414        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1415        let block_number = 1;
1416        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1417        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1418        let time_to_expiry = Duration::from_secs(1);
1419
1420        let fake_avs_registry_service =
1421            FakeAvsRegistryService::new(block_number, test_operators.clone());
1422        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1423
1424        // initialize 2 concurrent tasks
1425        let task_1_index = 1;
1426        let task_1_response = 123; // Initialize with appropriate data
1427        let task_1_response_digest = hash(task_1_response);
1428        let metadata1 = TaskMetadata::new(
1429            task_1_index,
1430            block_number,
1431            quorum_numbers.clone(),
1432            quorum_threshold_percentages.clone(),
1433            time_to_expiry,
1434        );
1435        let (handle, mut aggregator_response) = bls_agg_service.start();
1436        handle.initialize_task(metadata1).await.unwrap();
1437
1438        let task_2_index = 2;
1439        let task_2_response = 234; // Initialize with appropriate data
1440        let task_2_response_digest = hash(task_2_response);
1441        let metadata2 = TaskMetadata::new(
1442            task_2_index,
1443            block_number,
1444            quorum_numbers,
1445            quorum_threshold_percentages,
1446            time_to_expiry,
1447        );
1448        handle.initialize_task(metadata2).await.unwrap();
1449
1450        let bls_sig_task_1_op_1 = test_operator_1
1451            .bls_keypair
1452            .sign_message(task_1_response_digest.as_ref());
1453        handle
1454            .process_signature(TaskSignature::new(
1455                task_1_index,
1456                task_1_response_digest,
1457                bls_sig_task_1_op_1.clone(),
1458                test_operator_1.operator_id,
1459            ))
1460            .await
1461            .unwrap();
1462
1463        let bls_sig_task_1_op_2 = test_operator_2
1464            .bls_keypair
1465            .sign_message(task_1_response_digest.as_ref());
1466        handle
1467            .process_signature(TaskSignature::new(
1468                task_1_index,
1469                task_1_response_digest,
1470                bls_sig_task_1_op_2.clone(),
1471                test_operator_2.operator_id,
1472            ))
1473            .await
1474            .unwrap();
1475
1476        let bls_sig_task_2_op_1 = test_operator_1
1477            .bls_keypair
1478            .sign_message(task_2_response_digest.as_ref());
1479        handle
1480            .process_signature(TaskSignature::new(
1481                task_2_index,
1482                task_2_response_digest,
1483                bls_sig_task_2_op_1.clone(),
1484                test_operator_1.operator_id,
1485            ))
1486            .await
1487            .unwrap();
1488
1489        let bls_sig_task_2_op_2 = test_operator_2
1490            .bls_keypair
1491            .sign_message(task_2_response_digest.as_ref());
1492        handle
1493            .process_signature(TaskSignature::new(
1494                task_2_index,
1495                task_2_response_digest,
1496                bls_sig_task_2_op_2.clone(),
1497                test_operator_2.operator_id,
1498            ))
1499            .await
1500            .unwrap();
1501
1502        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1503        let signers_apk_g2 =
1504            aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1505        let signers_agg_sig_g1_task_1 = aggregate_g1_signatures(&[
1506            bls_sig_task_1_op_1.clone(),
1507            bls_sig_task_1_op_1,
1508            bls_sig_task_1_op_2.clone(),
1509            bls_sig_task_1_op_2,
1510        ]);
1511
1512        let expected_response_task_1 = BlsAggregationServiceResponse {
1513            task_index: task_1_index,
1514            task_response_digest: task_1_response_digest,
1515            non_signers_pub_keys_g1: vec![],
1516            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1517            signers_apk_g2: signers_apk_g2.clone(),
1518            signers_agg_sig_g1: signers_agg_sig_g1_task_1,
1519            non_signer_quorum_bitmap_indices: vec![],
1520            quorum_apk_indices: vec![],
1521            total_stake_indices: vec![],
1522            non_signer_stake_indices: vec![],
1523        };
1524
1525        let signers_agg_sig_g1_task_2 = aggregate_g1_signatures(&[
1526            bls_sig_task_2_op_1.clone(),
1527            bls_sig_task_2_op_1,
1528            bls_sig_task_2_op_2.clone(),
1529            bls_sig_task_2_op_2,
1530        ]);
1531
1532        let expected_response_task_2 = BlsAggregationServiceResponse {
1533            task_index: task_2_index,
1534            task_response_digest: task_2_response_digest,
1535            non_signers_pub_keys_g1: vec![],
1536            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1537            signers_apk_g2,
1538            signers_agg_sig_g1: signers_agg_sig_g1_task_2,
1539            non_signer_quorum_bitmap_indices: vec![],
1540            quorum_apk_indices: vec![],
1541            total_stake_indices: vec![],
1542            non_signer_stake_indices: vec![],
1543        };
1544
1545        let first_response = aggregator_response.receive_aggregated_response().await;
1546        let second_response = aggregator_response.receive_aggregated_response().await;
1547
1548        let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1
1549        {
1550            (first_response, second_response)
1551        } else {
1552            (second_response, first_response)
1553        };
1554
1555        assert_eq!(expected_response_task_1, task_1_response.unwrap());
1556        assert_eq!(expected_response_task_2, task_2_response.unwrap());
1557    }
1558
1559    #[tokio::test]
1560    async fn test_1_quorum_1_operator_0_signatures_task_expired() {
1561        let test_operator_1 = TestOperator {
1562            operator_id: U256::from(1).into(),
1563            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1564            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1565        };
1566
1567        let block_number = 1;
1568        let task_index: TaskIndex = 0;
1569        let quorum_numbers = vec![0];
1570        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1571        let time_to_expiry = Duration::from_secs(1);
1572        let _task_response = 123; // Initialize with appropriate data
1573
1574        let fake_avs_registry_service =
1575            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1576        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1577        let metadata = TaskMetadata::new(
1578            task_index,
1579            block_number,
1580            quorum_numbers,
1581            quorum_threshold_percentages,
1582            time_to_expiry,
1583        );
1584        let (handle, mut aggregator_response) = bls_agg_service.start();
1585        handle.initialize_task(metadata).await.unwrap();
1586
1587        let response = aggregator_response.receive_aggregated_response().await;
1588
1589        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1590    }
1591
1592    #[tokio::test]
1593    async fn test_1_quorum_2_operator_1_signatures_50_threshold() {
1594        let test_operator_1 = TestOperator {
1595            operator_id: U256::from(1).into(),
1596            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1597            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1598        };
1599        let test_operator_2 = TestOperator {
1600            operator_id: U256::from(2).into(),
1601            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1602            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1603        };
1604        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1605        let block_number = 1;
1606        let task_index = 0;
1607        let quorum_numbers: Vec<QuorumNum> = vec![0];
1608        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8];
1609        let time_to_expiry = Duration::from_secs(1);
1610        let task_response = 123; // Initialize with appropriate data
1611        let task_response_digest = hash(task_response);
1612        let bls_sig_op_1 = test_operator_1
1613            .bls_keypair
1614            .sign_message(task_response_digest.as_ref());
1615
1616        let fake_avs_registry_service =
1617            FakeAvsRegistryService::new(block_number, test_operators.clone());
1618        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1619
1620        let metadata = TaskMetadata::new(
1621            task_index,
1622            block_number,
1623            quorum_numbers,
1624            quorum_threshold_percentages,
1625            time_to_expiry,
1626        );
1627        let (handle, mut aggregator_response) = bls_agg_service.start();
1628        handle.initialize_task(metadata).await.unwrap();
1629
1630        handle
1631            .process_signature(TaskSignature::new(
1632                task_index,
1633                task_response_digest,
1634                bls_sig_op_1.clone(),
1635                test_operator_1.operator_id,
1636            ))
1637            .await
1638            .unwrap();
1639
1640        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1641
1642        let signers_apk_g2: BlsG2Point = test_operator_1.bls_keypair.public_key_g2();
1643
1644        let expected_agg_service_response = BlsAggregationServiceResponse {
1645            task_index,
1646            task_response_digest,
1647            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], //
1648            quorum_apks_g1: vec![quorum_apks_g1],
1649            signers_apk_g2,
1650            signers_agg_sig_g1: bls_sig_op_1,
1651            non_signer_quorum_bitmap_indices: vec![],
1652            quorum_apk_indices: vec![],
1653            total_stake_indices: vec![],
1654            non_signer_stake_indices: vec![],
1655        };
1656
1657        let response = aggregator_response.receive_aggregated_response().await;
1658
1659        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1660        assert_eq!(task_index, response.unwrap().task_index);
1661    }
1662
1663    #[tokio::test]
1664    async fn test_1_quorum_2_operator_1_signatures_60_threshold() {
1665        let test_operator_1 = TestOperator {
1666            operator_id: U256::from(1).into(),
1667            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1668            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1669        };
1670        let test_operator_2 = TestOperator {
1671            operator_id: U256::from(2).into(),
1672            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1673            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1674        };
1675        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1676        let block_number = 1;
1677        let task_index = 0;
1678        let quorum_numbers: Vec<QuorumNum> = vec![0];
1679        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8];
1680        let time_to_expiry = Duration::from_secs(1);
1681        let task_response = 123; // Initialize with appropriate data
1682        let task_response_digest = hash(task_response);
1683        let bls_sig_op_1 = test_operator_1
1684            .bls_keypair
1685            .sign_message(task_response_digest.as_ref());
1686
1687        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1688        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1689
1690        let metadata = TaskMetadata::new(
1691            task_index,
1692            block_number,
1693            quorum_numbers,
1694            quorum_threshold_percentages,
1695            time_to_expiry,
1696        );
1697        let (handle, mut aggregator_response) = bls_agg_service.start();
1698        handle.initialize_task(metadata).await.unwrap();
1699
1700        handle
1701            .process_signature(TaskSignature::new(
1702                task_index,
1703                task_response_digest,
1704                bls_sig_op_1,
1705                test_operator_1.operator_id,
1706            ))
1707            .await
1708            .unwrap();
1709
1710        let response = aggregator_response.receive_aggregated_response().await;
1711
1712        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1713    }
1714
1715    #[tokio::test]
1716    async fn test_2_quorums_2_operators_which_just_take_1_quorum_2_correct_signatures() {
1717        let test_operator_1 = TestOperator {
1718            operator_id: U256::from(1).into(),
1719            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1720            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1721            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1722        };
1723        let test_operator_2 = TestOperator {
1724            operator_id: U256::from(2).into(),
1725            // Note the quorums is [0, 1], but operator id 2 just stake 1.
1726            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1727            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1728        };
1729
1730        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1731        let block_number = 1;
1732        let task_index = 0;
1733        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1734        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1735        let time_to_expiry = Duration::from_secs(1);
1736        let task_response = 123; // Initialize with appropriate data
1737        let task_response_digest = hash(task_response);
1738
1739        let fake_avs_registry_service =
1740            FakeAvsRegistryService::new(block_number, test_operators.clone());
1741        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1742
1743        let metadata = TaskMetadata::new(
1744            task_index,
1745            block_number,
1746            quorum_numbers,
1747            quorum_threshold_percentages,
1748            time_to_expiry,
1749        );
1750        let (handle, mut aggregator_response) = bls_agg_service.start();
1751        handle.initialize_task(metadata).await.unwrap();
1752
1753        let bls_sig_op_1 = test_operator_1
1754            .bls_keypair
1755            .sign_message(task_response_digest.as_ref());
1756        handle
1757            .process_signature(TaskSignature::new(
1758                task_index,
1759                task_response_digest,
1760                bls_sig_op_1.clone(),
1761                test_operator_1.operator_id,
1762            ))
1763            .await
1764            .unwrap();
1765
1766        let bls_sig_op_2 = test_operator_2
1767            .bls_keypair
1768            .sign_message(task_response_digest.as_ref());
1769        handle
1770            .process_signature(TaskSignature::new(
1771                task_index,
1772                task_response_digest,
1773                bls_sig_op_2.clone(),
1774                test_operator_2.operator_id,
1775            ))
1776            .await
1777            .unwrap();
1778
1779        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1780        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1781
1782        let expected_agg_service_response = BlsAggregationServiceResponse {
1783            task_index,
1784            task_response_digest,
1785            non_signers_pub_keys_g1: vec![],
1786            quorum_apks_g1: vec![
1787                test_operator_1.bls_keypair.public_key(),
1788                test_operator_2.bls_keypair.public_key(),
1789            ],
1790            signers_apk_g2,
1791            signers_agg_sig_g1,
1792            non_signer_quorum_bitmap_indices: vec![],
1793            quorum_apk_indices: vec![],
1794            total_stake_indices: vec![],
1795            non_signer_stake_indices: vec![],
1796        };
1797
1798        let response = aggregator_response.receive_aggregated_response().await;
1799
1800        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1801        assert_eq!(task_index, response.unwrap().task_index);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() {
1806        let test_operator_1 = TestOperator {
1807            operator_id: U256::from(1).into(),
1808            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1809            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1810            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1811        };
1812        let test_operator_2 = TestOperator {
1813            operator_id: U256::from(2).into(),
1814            // Note the quorums is [0, 1], but operator id 2 just stake 1.
1815            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1816            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1817        };
1818
1819        let test_operator_3 = TestOperator {
1820            operator_id: U256::from(3).into(),
1821            // Note the quorums is [0, 1], but operator id 3 just stake 0.
1822            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1823            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1824        };
1825
1826        let test_operators = vec![
1827            test_operator_1.clone(),
1828            test_operator_2.clone(),
1829            test_operator_3.clone(),
1830        ];
1831        let block_number = 1;
1832        let task_index = 0;
1833        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1834        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8, 50u8];
1835        let time_to_expiry = Duration::from_secs(1);
1836        let task_response = 123; // Initialize with appropriate data
1837        let task_response_digest = hash(task_response);
1838
1839        let fake_avs_registry_service =
1840            FakeAvsRegistryService::new(block_number, test_operators.clone());
1841        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1842
1843        let metadata = TaskMetadata::new(
1844            task_index,
1845            block_number,
1846            quorum_numbers,
1847            quorum_threshold_percentages,
1848            time_to_expiry,
1849        );
1850        let (handle, mut aggregator_response) = bls_agg_service.start();
1851        handle.initialize_task(metadata).await.unwrap();
1852
1853        let bls_sig_op_1 = test_operator_1
1854            .bls_keypair
1855            .sign_message(task_response_digest.as_ref());
1856        handle
1857            .process_signature(TaskSignature::new(
1858                task_index,
1859                task_response_digest,
1860                bls_sig_op_1.clone(),
1861                test_operator_1.operator_id,
1862            ))
1863            .await
1864            .unwrap();
1865
1866        let bls_sig_op_2 = test_operator_2
1867            .bls_keypair
1868            .sign_message(task_response_digest.as_ref());
1869        handle
1870            .process_signature(TaskSignature::new(
1871                task_index,
1872                task_response_digest,
1873                bls_sig_op_2.clone(),
1874                test_operator_2.operator_id,
1875            ))
1876            .await
1877            .unwrap();
1878        let signers_apk_g2 =
1879            aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
1880        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1881        let quorum_apks_g1 = vec![
1882            aggregate_g1_public_keys(&vec![test_operator_1, test_operator_3.clone()]),
1883            aggregate_g1_public_keys(&vec![test_operator_2, test_operator_3.clone()]),
1884        ];
1885
1886        let expected_agg_service_response = BlsAggregationServiceResponse {
1887            task_index,
1888            task_response_digest,
1889            non_signers_pub_keys_g1: vec![test_operator_3.bls_keypair.public_key()],
1890            quorum_apks_g1,
1891            signers_apk_g2,
1892            signers_agg_sig_g1,
1893            non_signer_quorum_bitmap_indices: vec![],
1894            quorum_apk_indices: vec![],
1895            total_stake_indices: vec![],
1896            non_signer_stake_indices: vec![],
1897        };
1898
1899        let response = aggregator_response.receive_aggregated_response().await;
1900
1901        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1902        assert_eq!(task_index, response.unwrap().task_index);
1903    }
1904
1905    #[tokio::test]
1906    async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() {
1907        // results in `task expired`
1908        let test_operator_1 = TestOperator {
1909            operator_id: U256::from(1).into(),
1910            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1911            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1912            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1913        };
1914        let test_operator_2 = TestOperator {
1915            operator_id: U256::from(2).into(),
1916            // Note the quorums is [0, 1], but operator id 2 just stake 1.
1917            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1918            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1919        };
1920
1921        let test_operator_3 = TestOperator {
1922            operator_id: U256::from(3).into(),
1923            // Note the quorums is [0, 1], but operator id 3 just stake 0.
1924            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1925            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1926        };
1927
1928        let test_operators = vec![
1929            test_operator_1.clone(),
1930            test_operator_2.clone(),
1931            test_operator_3.clone(),
1932        ];
1933        let block_number = 1;
1934        let task_index = 0;
1935        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1936        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8];
1937        let time_to_expiry = Duration::from_secs(1);
1938        let task_response = 123; // Initialize with appropriate data
1939        let task_response_digest = hash(task_response);
1940
1941        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1942        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1943
1944        let metadata = TaskMetadata::new(
1945            task_index,
1946            block_number,
1947            quorum_numbers,
1948            quorum_threshold_percentages,
1949            time_to_expiry,
1950        );
1951        let (handle, mut aggregator_response) = bls_agg_service.start();
1952        handle.initialize_task(metadata).await.unwrap();
1953
1954        let bls_sig_op_1 = test_operator_1
1955            .bls_keypair
1956            .sign_message(task_response_digest.as_ref());
1957        handle
1958            .process_signature(TaskSignature::new(
1959                task_index,
1960                task_response_digest,
1961                bls_sig_op_1,
1962                test_operator_1.operator_id,
1963            ))
1964            .await
1965            .unwrap();
1966
1967        let bls_sig_op_2 = test_operator_2
1968            .bls_keypair
1969            .sign_message(task_response_digest.as_ref());
1970
1971        handle
1972            .process_signature(TaskSignature::new(
1973                task_index,
1974                task_response_digest,
1975                bls_sig_op_2,
1976                test_operator_2.operator_id,
1977            ))
1978            .await
1979            .unwrap();
1980
1981        let response = aggregator_response.receive_aggregated_response().await;
1982
1983        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1984    }
1985
1986    #[tokio::test]
1987    async fn test_2_quorums_1_operator_which_just_take_1_quorum_1_signature_task_expired() {
1988        let test_operator_1 = TestOperator {
1989            operator_id: U256::from(1).into(),
1990            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1991            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1992            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1993        };
1994
1995        let block_number = 1;
1996        let task_index = 0;
1997        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1998        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
1999        let time_to_expiry = Duration::from_secs(1);
2000        let task_response = 123; // Initialize with appropriate data
2001        let task_response_digest = hash(task_response);
2002
2003        let fake_avs_registry_service =
2004            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2005        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2006
2007        let metadata = TaskMetadata::new(
2008            task_index,
2009            block_number,
2010            quorum_numbers,
2011            quorum_threshold_percentages,
2012            time_to_expiry,
2013        );
2014        let (handle, mut aggregator_response) = bls_agg_service.start();
2015        handle.initialize_task(metadata).await.unwrap();
2016
2017        let bls_sig_op_1 = test_operator_1
2018            .bls_keypair
2019            .sign_message(task_response_digest.as_ref());
2020
2021        handle
2022            .process_signature(TaskSignature::new(
2023                task_index,
2024                task_response_digest,
2025                bls_sig_op_1,
2026                test_operator_1.operator_id,
2027            ))
2028            .await
2029            .unwrap();
2030
2031        let response = aggregator_response.receive_aggregated_response().await;
2032
2033        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2034    }
2035
2036    #[tokio::test]
2037    async fn test_2_quorums_2_operators_where_1_operator_just_take_1_quorum_1_signature_task_expired(
2038    ) {
2039        let test_operator_1 = TestOperator {
2040            operator_id: U256::from(1).into(),
2041            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2042            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2043            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2044        };
2045        let test_operator_2 = TestOperator {
2046            operator_id: U256::from(2).into(),
2047            // Note the quorums is [0, 1], but operator id 2 just stake 1.
2048            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2049            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2050        };
2051
2052        let block_number = 1;
2053        let task_index = 0;
2054        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2055        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2056        let time_to_expiry = Duration::from_secs(1);
2057        let task_response = 123; // Initialize with appropriate data
2058        let task_response_digest = hash(task_response);
2059        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2060
2061        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2062        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2063
2064        let metadata = TaskMetadata::new(
2065            task_index,
2066            block_number,
2067            quorum_numbers,
2068            quorum_threshold_percentages,
2069            time_to_expiry,
2070        );
2071        let (handle, mut aggregator_response) = bls_agg_service.start();
2072        handle.initialize_task(metadata).await.unwrap();
2073
2074        let bls_sig_op_1 = test_operator_1
2075            .bls_keypair
2076            .sign_message(task_response_digest.as_ref());
2077        handle
2078            .process_signature(TaskSignature::new(
2079                task_index,
2080                task_response_digest,
2081                bls_sig_op_1,
2082                test_operator_1.operator_id,
2083            ))
2084            .await
2085            .unwrap();
2086
2087        let response = aggregator_response.receive_aggregated_response().await;
2088
2089        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2090    }
2091
2092    #[tokio::test]
2093    async fn send_signature_of_task_not_initialized() {
2094        let test_operator_1 = TestOperator {
2095            operator_id: U256::from(1).into(),
2096            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2097            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2098            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2099        };
2100
2101        let block_number = 1;
2102        let task_index = 0;
2103        let task_response = 123; // Initialize with appropriate data
2104        let task_response_digest = hash(task_response);
2105
2106        let fake_avs_registry_service =
2107            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2108        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2109
2110        let bls_sig_op_1 = test_operator_1
2111            .bls_keypair
2112            .sign_message(task_response_digest.as_ref());
2113        let (handle, _) = bls_agg_service.start();
2114        let result = handle
2115            .process_signature(TaskSignature::new(
2116                task_index,
2117                task_response_digest,
2118                bls_sig_op_1,
2119                test_operator_1.operator_id,
2120            ))
2121            .await;
2122
2123        assert_eq!(Err(BlsAggregationServiceError::TaskNotFound), result);
2124    }
2125
2126    #[tokio::test]
2127    async fn test_1_quorum_2_operator_2_signatures_on_2_different_msgs() {
2128        let test_operator_1 = TestOperator {
2129            operator_id: U256::from(1).into(),
2130            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2131            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2132        };
2133        let test_operator_2 = TestOperator {
2134            operator_id: U256::from(2).into(),
2135            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2136            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2137        };
2138        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2139        let block_number = 1;
2140        let task_index = 0;
2141        let quorum_numbers: Vec<QuorumNum> = vec![0];
2142        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
2143        let time_to_expiry = Duration::from_secs(1);
2144        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2145        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2146
2147        let metadata = TaskMetadata::new(
2148            task_index,
2149            block_number,
2150            quorum_numbers,
2151            quorum_threshold_percentages,
2152            time_to_expiry,
2153        );
2154        let (handle, mut aggregator_response) = bls_agg_service.start();
2155        handle.initialize_task(metadata).await.unwrap();
2156
2157        let task_response_1 = 123; // Initialize with appropriate data
2158        let task_response_1_digest = hash(task_response_1);
2159        let bls_sig_op_1 = test_operator_1
2160            .bls_keypair
2161            .sign_message(task_response_1_digest.as_ref());
2162        handle
2163            .process_signature(TaskSignature::new(
2164                task_index,
2165                task_response_1_digest,
2166                bls_sig_op_1,
2167                test_operator_1.operator_id,
2168            ))
2169            .await
2170            .unwrap();
2171
2172        let task_response_2 = 456; // Initialize with appropriate data
2173        let task_response_2_digest = hash(task_response_2);
2174        let bls_sig_op_2 = test_operator_1
2175            .bls_keypair
2176            .sign_message(task_response_2_digest.as_ref());
2177        handle
2178            .process_signature(TaskSignature::new(
2179                task_index,
2180                task_response_2_digest,
2181                bls_sig_op_2,
2182                test_operator_1.operator_id,
2183            ))
2184            .await
2185            .unwrap();
2186
2187        let response = aggregator_response.receive_aggregated_response().await;
2188
2189        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2190    }
2191
2192    #[tokio::test]
2193    async fn test_1_quorum_1_operator_1_invalid_signature() {
2194        let test_operator_1 = TestOperator {
2195            operator_id: U256::from(1).into(),
2196            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2197            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2198        };
2199
2200        let block_number = 1;
2201        let task_index = 0;
2202        let quorum_numbers = vec![0];
2203        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
2204        let time_to_expiry = Duration::from_secs(1);
2205        let task_response = 123; // Initialize with appropriate data
2206
2207        let wrong_task_response_digest = hash(task_response + 1);
2208        let bls_signature = test_operator_1
2209            .bls_keypair
2210            .sign_message(hash(task_response).as_ref());
2211        let fake_avs_registry_service =
2212            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2213        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2214        let metadata = TaskMetadata::new(
2215            task_index,
2216            block_number,
2217            quorum_numbers,
2218            quorum_threshold_percentages,
2219            time_to_expiry,
2220        );
2221        let (handle, mut aggregator_response) = bls_agg_service.start();
2222        handle.initialize_task(metadata).await.unwrap();
2223
2224        let result = handle
2225            .process_signature(TaskSignature::new(
2226                task_index,
2227                wrong_task_response_digest,
2228                bls_signature.clone(),
2229                test_operator_1.operator_id,
2230            ))
2231            .await;
2232
2233        assert_eq!(
2234            Err(BlsAggregationServiceError::SignatureVerificationError(
2235                IncorrectSignature
2236            )),
2237            result
2238        );
2239
2240        // Also test that the aggregator service is not affected by the invalid signature, so the task should expire
2241        let response = aggregator_response.receive_aggregated_response().await;
2242
2243        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2244    }
2245
2246    #[tokio::test]
2247    async fn test_signatures_are_processed_during_window_after_quorum() {
2248        let test_operator_1 = TestOperator {
2249            operator_id: U256::from(1).into(),
2250            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2251            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2252        };
2253        let test_operator_2 = TestOperator {
2254            operator_id: U256::from(2).into(),
2255            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2256            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2257        };
2258        let test_operator_3 = TestOperator {
2259            operator_id: U256::from(3).into(),
2260            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2261            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2262        };
2263        let test_operators = vec![
2264            test_operator_1.clone(),
2265            test_operator_2.clone(),
2266            test_operator_3.clone(),
2267        ];
2268        let block_number = 1;
2269        let task_index = 0;
2270        let task_response = 123;
2271        let quorum_numbers: Vec<QuorumNum> = vec![0];
2272        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50_u8];
2273        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2274        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2275
2276        let time_to_expiry = Duration::from_secs(5);
2277        let window_duration = Duration::from_secs(1);
2278
2279        let start = Instant::now();
2280        let metadata = TaskMetadata::new(
2281            task_index,
2282            block_number,
2283            quorum_numbers,
2284            quorum_threshold_percentages,
2285            time_to_expiry,
2286        )
2287        .with_window_duration(window_duration);
2288
2289        let (handle, mut aggregator_response) = bls_agg_service.start();
2290        handle.initialize_task(metadata).await.unwrap();
2291
2292        let task_response_1_digest = hash(task_response);
2293        let bls_sig_op_1 = test_operator_1
2294            .bls_keypair
2295            .sign_message(task_response_1_digest.as_ref());
2296        handle
2297            .process_signature(TaskSignature::new(
2298                task_index,
2299                task_response_1_digest,
2300                bls_sig_op_1.clone(),
2301                test_operator_1.operator_id,
2302            ))
2303            .await
2304            .unwrap();
2305
2306        let task_response_2_digest = hash(task_response);
2307        let bls_sig_op_2 = test_operator_2
2308            .bls_keypair
2309            .sign_message(task_response_2_digest.as_ref());
2310        handle
2311            .process_signature(TaskSignature::new(
2312                task_index,
2313                task_response_2_digest,
2314                bls_sig_op_2.clone(),
2315                test_operator_2.operator_id,
2316            ))
2317            .await
2318            .unwrap();
2319
2320        // quorum reached here, window should be open receiving signatures for 1 second
2321        sleep(Duration::from_millis(500)).await;
2322        let task_response_3_digest = hash(task_response);
2323        let bls_sig_op_3 = test_operator_3
2324            .bls_keypair
2325            .sign_message(task_response_3_digest.as_ref());
2326        handle
2327            .process_signature(TaskSignature::new(
2328                task_index,
2329                task_response_3_digest,
2330                bls_sig_op_3.clone(),
2331                test_operator_3.operator_id,
2332            ))
2333            .await
2334            .unwrap();
2335
2336        let signers_apk_g2 = aggregate_g2_public_keys(&vec![
2337            test_operator_1.clone(),
2338            test_operator_2.clone(),
2339            test_operator_3.clone(),
2340        ]);
2341        let signers_agg_sig_g1 =
2342            aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
2343        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2344            test_operator_1,
2345            test_operator_2,
2346            test_operator_3,
2347        ])];
2348
2349        let expected_agg_service_response = BlsAggregationServiceResponse {
2350            task_index,
2351            task_response_digest: task_response_3_digest,
2352            non_signers_pub_keys_g1: vec![],
2353            quorum_apks_g1,
2354            signers_apk_g2,
2355            signers_agg_sig_g1,
2356            non_signer_quorum_bitmap_indices: vec![],
2357            quorum_apk_indices: vec![],
2358            total_stake_indices: vec![],
2359            non_signer_stake_indices: vec![],
2360        };
2361
2362        let response = aggregator_response.receive_aggregated_response().await;
2363
2364        let elapsed = start.elapsed();
2365        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2366        assert_eq!(task_index, response.unwrap().task_index);
2367        assert!(elapsed < time_to_expiry);
2368        assert!(elapsed >= window_duration);
2369    }
2370
2371    #[tokio::test]
2372    async fn test_if_quorum_has_been_reached_and_the_task_expires_during_window_the_response_is_sent(
2373    ) {
2374        let test_operator_1 = TestOperator {
2375            operator_id: U256::from(1).into(),
2376            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2377            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2378        };
2379        let test_operator_2 = TestOperator {
2380            operator_id: U256::from(2).into(),
2381            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2382            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2383        };
2384        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2385        let block_number = 1;
2386        let task_index = 0;
2387        let task_response = 123;
2388        let quorum_numbers: Vec<QuorumNum> = vec![0];
2389        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2390        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2391        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2392
2393        let time_to_expiry = Duration::from_secs(2);
2394        let window_duration = Duration::from_secs(10);
2395
2396        let start = Instant::now();
2397        let metadata = TaskMetadata::new(
2398            task_index,
2399            block_number,
2400            quorum_numbers,
2401            quorum_threshold_percentages,
2402            time_to_expiry,
2403        )
2404        .with_window_duration(window_duration);
2405        let (handle, mut aggregator_response) = bls_agg_service.start();
2406        handle.initialize_task(metadata).await.unwrap();
2407
2408        let task_response_1_digest = hash(task_response);
2409        let bls_sig_op_1 = test_operator_1
2410            .bls_keypair
2411            .sign_message(task_response_1_digest.as_ref());
2412        handle
2413            .process_signature(TaskSignature::new(
2414                task_index,
2415                task_response_1_digest,
2416                bls_sig_op_1.clone(),
2417                test_operator_1.operator_id,
2418            ))
2419            .await
2420            .unwrap();
2421
2422        // quorum reached here, window should be open receiving signatures
2423
2424        let task_response_2_digest = hash(task_response);
2425        let bls_sig_op_2 = test_operator_2
2426            .bls_keypair
2427            .sign_message(task_response_2_digest.as_ref());
2428        handle
2429            .process_signature(TaskSignature::new(
2430                task_index,
2431                task_response_2_digest,
2432                bls_sig_op_2.clone(),
2433                test_operator_2.operator_id,
2434            ))
2435            .await
2436            .unwrap();
2437
2438        let signers_apk_g2 =
2439            aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
2440        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
2441        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2442            test_operator_1,
2443            test_operator_2,
2444        ])];
2445
2446        let expected_agg_service_response = BlsAggregationServiceResponse {
2447            task_index,
2448            task_response_digest: task_response_2_digest,
2449            non_signers_pub_keys_g1: vec![],
2450            quorum_apks_g1,
2451            signers_apk_g2,
2452            signers_agg_sig_g1,
2453            non_signer_quorum_bitmap_indices: vec![],
2454            quorum_apk_indices: vec![],
2455            total_stake_indices: vec![],
2456            non_signer_stake_indices: vec![],
2457        };
2458
2459        let response = aggregator_response.receive_aggregated_response().await;
2460
2461        let elapsed = start.elapsed();
2462        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2463        assert_eq!(task_index, response.unwrap().task_index);
2464        assert!(elapsed >= time_to_expiry);
2465        assert!(elapsed < window_duration);
2466    }
2467
2468    #[tokio::test]
2469    async fn test_if_window_duration_is_zero_no_signatures_are_aggregated_after_reaching_quorum() {
2470        let test_operator_1 = TestOperator {
2471            operator_id: U256::from(1).into(),
2472            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2473            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2474        };
2475        let test_operator_2 = TestOperator {
2476            operator_id: U256::from(2).into(),
2477            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2478            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2479        };
2480        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2481        let block_number = 1;
2482        let task_index = 0;
2483        let task_response = 123;
2484        let quorum_numbers: Vec<QuorumNum> = vec![0];
2485        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2486        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2487        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2488
2489        let time_to_expiry = Duration::from_secs(2);
2490        let window_duration = Duration::ZERO;
2491
2492        let start = Instant::now();
2493        let metadata = TaskMetadata::new(
2494            task_index,
2495            block_number,
2496            quorum_numbers,
2497            quorum_threshold_percentages,
2498            time_to_expiry,
2499        )
2500        .with_window_duration(window_duration);
2501        let (handle, mut aggregator_response) = bls_agg_service.start();
2502        handle.initialize_task(metadata).await.unwrap();
2503
2504        let task_response_1_digest = hash(task_response);
2505        let bls_sig_op_1 = test_operator_1
2506            .bls_keypair
2507            .sign_message(task_response_1_digest.as_ref());
2508        handle
2509            .process_signature(TaskSignature::new(
2510                task_index,
2511                task_response_1_digest,
2512                bls_sig_op_1.clone(),
2513                test_operator_1.operator_id,
2514            ))
2515            .await
2516            .unwrap();
2517
2518        // quorum reached here but window duration is zero, so no more signatures should be aggregated
2519        sleep(Duration::from_millis(1)).await;
2520
2521        let task_response_2_digest = hash(task_response);
2522        let bls_sig_op_2 = test_operator_2
2523            .bls_keypair
2524            .sign_message(task_response_2_digest.as_ref());
2525        let process_signature_result = handle
2526            .process_signature(TaskSignature::new(
2527                task_index,
2528                task_response_2_digest,
2529                bls_sig_op_2,
2530                test_operator_2.operator_id,
2531            ))
2532            .await;
2533        assert_eq!(
2534            Err(BlsAggregationServiceError::SenderError),
2535            process_signature_result
2536        );
2537
2538        let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2539        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2540        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2541            test_operator_1,
2542            test_operator_2.clone(),
2543        ])];
2544
2545        let expected_agg_service_response = BlsAggregationServiceResponse {
2546            task_index,
2547            task_response_digest: task_response_1_digest,
2548            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2549            quorum_apks_g1,
2550            signers_apk_g2,
2551            signers_agg_sig_g1,
2552            non_signer_quorum_bitmap_indices: vec![],
2553            quorum_apk_indices: vec![],
2554            total_stake_indices: vec![],
2555            non_signer_stake_indices: vec![],
2556        };
2557
2558        let response = aggregator_response.receive_aggregated_response().await;
2559
2560        let elapsed = start.elapsed();
2561        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2562        assert_eq!(task_index, response.unwrap().task_index);
2563        assert!(elapsed < time_to_expiry);
2564    }
2565
2566    #[tokio::test]
2567    async fn test_no_signatures_are_aggregated_after_window() {
2568        let test_operator_1 = TestOperator {
2569            operator_id: U256::from(1).into(),
2570            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2571            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2572        };
2573        let test_operator_2 = TestOperator {
2574            operator_id: U256::from(2).into(),
2575            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2576            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2577        };
2578        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2579        let block_number = 1;
2580        let task_index = 0;
2581        let task_response = 123;
2582        let quorum_numbers: Vec<QuorumNum> = vec![0];
2583        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2584        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2585        let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2586
2587        let time_to_expiry = Duration::from_secs(5);
2588        let window_duration = Duration::from_secs(1);
2589
2590        let start = Instant::now();
2591        let metadata = TaskMetadata::new(
2592            task_index,
2593            block_number,
2594            quorum_numbers,
2595            quorum_threshold_percentages,
2596            time_to_expiry,
2597        )
2598        .with_window_duration(window_duration);
2599        let (handle, mut aggregator_response) = bls_agg_service.start();
2600        handle.initialize_task(metadata).await.unwrap();
2601
2602        let task_response_1_digest = hash(task_response);
2603        let bls_sig_op_1 = test_operator_1
2604            .bls_keypair
2605            .sign_message(task_response_1_digest.as_ref());
2606        handle
2607            .process_signature(TaskSignature::new(
2608                task_index,
2609                task_response_1_digest,
2610                bls_sig_op_1.clone(),
2611                test_operator_1.operator_id,
2612            ))
2613            .await
2614            .unwrap();
2615
2616        // quorum reached here, window should be open for 1 second
2617        sleep(Duration::from_secs(2)).await;
2618
2619        let task_response_2_digest = hash(task_response);
2620        let bls_sig_op_2 = test_operator_2
2621            .bls_keypair
2622            .sign_message(task_response_2_digest.as_ref());
2623        let process_signature_result = handle
2624            .process_signature(TaskSignature::new(
2625                task_index,
2626                task_response_2_digest,
2627                bls_sig_op_2,
2628                test_operator_2.operator_id,
2629            ))
2630            .await;
2631        assert_eq!(
2632            Err(BlsAggregationServiceError::SenderError),
2633            process_signature_result
2634        );
2635
2636        let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2637        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2638        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2639            test_operator_1,
2640            test_operator_2.clone(),
2641        ])];
2642
2643        let expected_agg_service_response = BlsAggregationServiceResponse {
2644            task_index,
2645            task_response_digest: task_response_1_digest,
2646            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2647            quorum_apks_g1,
2648            signers_apk_g2,
2649            signers_agg_sig_g1,
2650            non_signer_quorum_bitmap_indices: vec![],
2651            quorum_apk_indices: vec![],
2652            total_stake_indices: vec![],
2653            non_signer_stake_indices: vec![],
2654        };
2655
2656        let response = aggregator_response.receive_aggregated_response().await;
2657
2658        let elapsed = start.elapsed();
2659        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2660        assert_eq!(task_index, response.unwrap().task_index);
2661        assert!(elapsed < time_to_expiry);
2662    }
2663}