eigen_services_blsaggregation/
bls_agg.rs

1use super::bls_aggregation_service_error::BlsAggregationServiceError;
2use super::bls_aggregation_service_response::BlsAggregationServiceResponse;
3use alloy::primitives::{FixedBytes, Uint, U256};
4use ark_bn254::{G1Affine, G2Affine};
5use ark_ec::AffineRepr;
6use eigen_crypto_bls::{BlsG1Point, BlsG2Point, Signature};
7use eigen_crypto_bn254::utils::verify_message;
8use eigen_logging::logger::SharedLogger;
9use eigen_services_avsregistry::AvsRegistryService;
10use eigen_types::avs_state::OperatorAvsState;
11use eigen_types::{
12    avs::{SignatureVerificationError, TaskIndex, TaskResponseDigest},
13    operator::{QuorumThresholdPercentage, QuorumThresholdPercentages},
14};
15use std::collections::HashMap;
16use tokio::{
17    sync::{
18        mpsc::{self, UnboundedReceiver, UnboundedSender},
19        oneshot,
20    },
21    time::Duration,
22};
23
24/// Contains the aggregated operators signers information
25#[derive(Debug, Clone)]
26pub struct AggregatedOperators {
27    signers_apk_g2: BlsG2Point,
28    signers_agg_sig_g1: Signature,
29    signers_total_stake_per_quorum: HashMap<u8, U256>,
30    pub signers_operator_ids_set: HashMap<FixedBytes<32>, bool>,
31}
32
33/// Contains the metadata required to initialize a new task.
34#[derive(Clone)]
35pub struct TaskMetadata {
36    /// Index of the task
37    task_index: TaskIndex,
38    /// Block the task was created
39    task_created_block: u64,
40    /// Quorum numbers which should respond to the task
41    quorum_numbers: Vec<u8>,
42    /// Thresholds for each quorum
43    quorum_threshold_percentages: QuorumThresholdPercentages,
44    /// Time before expiry of the task response aggregation
45    time_to_expiry: Duration,
46    // Duration of the window to wait for signatures after quorum is reached
47    window_duration: Duration,
48}
49
50impl TaskMetadata {
51    /// Creates a new instance of [`TaskMetadata`]
52    ///
53    /// # Arguments
54    ///
55    /// * `task_index` - index of the task
56    /// * `task_created_block` - block number at which the task was created
57    /// * `quorum_numbers` - quorum numbers which should respond to the task
58    /// * `quorum_threshold_percentages` - threshold percentages for each quorum
59    /// * `time_to_expiry` - time until the task expires
60    ///
61    /// Use [`with_window_duration`](Self::with_window_duration) to set the window duration.
62    /// If the window duration is not set, it will default to [`Duration::ZERO`].
63    ///
64    /// # Returns
65    ///
66    /// A new instance of [`TaskMetadata`]
67    pub fn new(
68        task_index: TaskIndex,
69        task_created_block: u64,
70        quorum_numbers: Vec<u8>,
71        quorum_threshold_percentages: QuorumThresholdPercentages,
72        time_to_expiry: Duration,
73    ) -> Self {
74        Self {
75            task_index,
76            task_created_block,
77            quorum_numbers,
78            quorum_threshold_percentages,
79            time_to_expiry,
80            window_duration: Duration::ZERO,
81        }
82    }
83
84    /// Sets the window duration for the task
85    ///
86    /// # Arguments
87    /// * `window_duration` - The duration of the window to wait for signatures after quorum is reached
88    ///
89    /// # Returns
90    ///
91    /// An instance of [`TaskMetadata`] with the window duration set
92    pub fn with_window_duration(mut self, window_duration: Duration) -> Self {
93        self.window_duration = window_duration;
94        self
95    }
96}
97
98/// Contains the information of a signed task response
99#[derive(Clone)]
100pub struct TaskSignature {
101    // Index of the task
102    task_index: TaskIndex,
103    // Digest of the task response
104    task_response_digest: TaskResponseDigest,
105    // BLS signature of the task response
106    bls_signature: Signature,
107    // Operator ID of the operator that signed the task response
108    operator_id: FixedBytes<32>,
109}
110
111impl TaskSignature {
112    /// Creates a new instance of [`TaskSignature``]
113    ///
114    /// # Arguments
115    /// * `task_index` - index of the task
116    /// * `task_response_digest` - digest of the task response
117    /// * `bls_signature` - bls signature of the task response
118    /// * `operator_id` - operator ID of the operator that signed the task response
119    ///
120    /// # Returns
121    ///
122    /// [`TaskSignature`] instance
123    pub fn new(
124        task_index: TaskIndex,
125        task_response_digest: TaskResponseDigest,
126        bls_signature: Signature,
127        operator_id: FixedBytes<32>,
128    ) -> Self {
129        Self {
130            task_index,
131            task_response_digest,
132            bls_signature,
133            operator_id,
134        }
135    }
136}
137
138/// Valid messages to interact with the BLS Aggregator Service
139pub enum AggregationMessage {
140    InitializeTask(
141        TaskMetadata,
142        oneshot::Sender<Result<(), BlsAggregationServiceError>>,
143    ),
144    ProcessSignature(
145        TaskSignature,
146        oneshot::Sender<Result<(), BlsAggregationServiceError>>,
147    ),
148}
149
150/// Handler to interact with the BLS Aggregator Service
151#[derive(Debug, Clone)]
152pub struct ServiceHandle {
153    /// Channel to send messages to the BLS Aggregator Service
154    msg_sender: UnboundedSender<AggregationMessage>,
155}
156
157impl ServiceHandle {
158    /// Sends a message to the BLS Aggregator Service to initialize a new task.
159    ///
160    /// # Arguments
161    ///
162    /// * `metadata` - The metadata of the task to initialize
163    ///
164    /// # Returns
165    ///
166    /// Returns error if the task index already exists
167    pub async fn initialize_task(
168        &self,
169        metadata: TaskMetadata,
170    ) -> Result<(), BlsAggregationServiceError> {
171        let (tx, rx) = oneshot::channel();
172        self.msg_sender
173            .send(AggregationMessage::InitializeTask(metadata, tx))
174            .map_err(|_| BlsAggregationServiceError::SenderError)?;
175
176        rx.await
177            .map_err(|_| BlsAggregationServiceError::ReceiverError)?
178    }
179
180    /// Sends a message to the BLS Aggregator Service to process a signature.
181    ///
182    /// # Arguments
183    ///
184    /// * `task_signature` - The signed task response
185    ///
186    /// # Returns error:
187    ///
188    /// * `TaskNotFound` - If the task is not found
189    /// * `ChannelError` - If there is an error while sending the task through the channel
190    /// * `SignatureVerificationError` - If the signature verification fails
191    pub async fn process_signature(
192        &self,
193        task_signature: TaskSignature,
194    ) -> Result<(), BlsAggregationServiceError> {
195        let (tx, rx) = oneshot::channel();
196        self.msg_sender
197            .send(AggregationMessage::ProcessSignature(task_signature, tx))
198            .map_err(|_| BlsAggregationServiceError::SenderError)?;
199
200        rx.await
201            .map_err(|_| BlsAggregationServiceError::ReceiverError)?
202    }
203}
204
205/// Receiver to receive the aggregated responses from the BLS Aggregator Service.
206#[derive(Debug)]
207pub struct AggregateReceiver {
208    /// Channel to receive the aggregated responses from the BLS Aggregator Service
209    aggregate_receiver:
210        UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
211}
212
213impl AggregateReceiver {
214    /// Receives the aggregated response from the BLS Aggregator Service.
215    ///
216    /// # Returns
217    ///
218    /// Returns the aggregated response or an error if the channel is closed.
219    pub async fn receive_aggregated_response(
220        &mut self,
221    ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
222        self.aggregate_receiver
223            .recv()
224            .await
225            .ok_or(BlsAggregationServiceError::ReceiverError)?
226    }
227}
228
229/// The BLS Aggregator Service main struct
230#[derive(Debug)]
231pub struct BlsAggregatorService<A: AvsRegistryService>
232where
233    A: Clone,
234{
235    logger: SharedLogger,
236    avs_registry_service: A,
237}
238
239/// Represents a signed task response digest
240#[derive(Debug)]
241struct SignedTaskResponseDigest {
242    task_response_digest: TaskResponseDigest,
243
244    bls_signature: Signature,
245
246    operator_id: FixedBytes<32>,
247
248    result_channel: oneshot::Sender<Result<(), BlsAggregationServiceError>>,
249}
250
251impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService<A> {
252    /// Creates a new instance of the BlsAggregatorService with the given AVS registry service
253    ///
254    /// Creates a tokio unbounded_channel to send and received aggregated responses.
255    ///
256    /// # Arguments
257    ///
258    /// * `avs_registry_service` - The AVS registry service
259    /// * `logger` - Logger to log messages
260    pub fn new(avs_registry_service: A, logger: SharedLogger) -> Self {
261        Self {
262            logger,
263            avs_registry_service,
264        }
265    }
266
267    /// Starts the BLS Aggregator Service running the main loop in background.
268    ///
269    /// # Returns
270    ///
271    /// Returns a tuple with the [`ServiceHandle`] and [`AggregateReceiver`] to interact with the service
272    pub fn start(self) -> (ServiceHandle, AggregateReceiver) {
273        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
274        let (agg_tx, agg_rx) = mpsc::unbounded_channel();
275
276        tokio::spawn(async move {
277            self.run(msg_rx, agg_tx).await;
278        });
279
280        // Create service handler and aggregate receiver to user can interact with the service
281        let service_handler = ServiceHandle { msg_sender: msg_tx };
282        let aggregate_receiver = AggregateReceiver {
283            aggregate_receiver: agg_rx,
284        };
285
286        (service_handler, aggregate_receiver)
287    }
288
289    /// Runs the main loop of the BLS Aggregator Service.
290    ///
291    /// This function continuously processes messages from `msg_receiver` and handles:
292    /// * [`InitializeTask`]: Initializes a new aggregation task
293    /// * [`ProcessSignature`]: Forwards a signature to the appropriate task aggregator and relays the verification result.
294    ///
295    /// The final aggregated response is sent through the `aggregate_sender` channel. In addition, each
296    /// message (both [`InitializeTask`] and [`ProcessSignature`]) uses its own channel to return specific errors or results.
297    ///
298    /// # Arguments
299    ///
300    /// * `msg_receiver` - The receiver channel to receive the valid messages
301    /// * `aggregate_sender` - The sender channel to send the aggregated responses
302    async fn run(
303        self,
304        mut msg_receiver: UnboundedReceiver<AggregationMessage>,
305        aggregate_sender: UnboundedSender<
306            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
307        >,
308    ) {
309        let mut task_channels: HashMap<TaskIndex, UnboundedSender<SignedTaskResponseDigest>> =
310            HashMap::new();
311
312        while let Some(message) = msg_receiver.recv().await {
313            match message {
314                AggregationMessage::InitializeTask(metadata, result_sender) => {
315                    let task_index = metadata.task_index;
316                    if task_channels.contains_key(&task_index) {
317                        // Task already exists - return error
318                        result_sender
319                            .send(Err(BlsAggregationServiceError::DuplicateTaskIndex))
320                            .ok();
321                        continue;
322                    }
323
324                    // Create a new channel to receive the signed task responses
325                    let (signature_tx, signature_rx) =
326                        mpsc::unbounded_channel::<SignedTaskResponseDigest>();
327                    task_channels.insert(task_index, signature_tx);
328
329                    let avs_registry_service = self.avs_registry_service.clone();
330                    let aggregated_response_sender = aggregate_sender.clone();
331                    let logger = self.logger.clone();
332
333                    tokio::spawn(async move {
334                        let _ = Self::single_task_aggregator(
335                            avs_registry_service,
336                            metadata,
337                            aggregated_response_sender,
338                            signature_rx,
339                            logger,
340                        )
341                        .await
342                        .inspect_err(|err| {
343                            println!("Error with single_task_aggregator: {:?}", err);
344                        });
345                    });
346
347                    let _ = result_sender.send(Ok(()));
348                }
349                AggregationMessage::ProcessSignature(task_signature, result_sender) => {
350                    if let Some(sig_sender) = task_channels.get_mut(&task_signature.task_index) {
351                        // Send the signed task response to the task aggregator
352                        let signed_digest = SignedTaskResponseDigest {
353                            task_response_digest: task_signature.task_response_digest,
354                            bls_signature: task_signature.bls_signature,
355                            operator_id: task_signature.operator_id,
356                            result_channel: result_sender,
357                        };
358
359                        if let Err(send_error) = sig_sender.send(signed_digest) {
360                            let _ = send_error
361                                .0
362                                .result_channel
363                                .send(Err(BlsAggregationServiceError::SenderError));
364                        }
365                    } else {
366                        result_sender
367                            .send(Err(BlsAggregationServiceError::TaskNotFound))
368                            .ok();
369                    }
370                }
371            }
372        }
373    }
374
375    /// Processes each signed task responses given a task_index for a single task.
376    ///
377    /// It reads the signed task responses from the receiver channel and aggregates them.
378    /// * If the quorum threshold is met, it sends the aggregated response to the aggregated response sender.
379    /// * If the time to expiry is reached, it sends a task expired error to the aggregated response sender.
380    /// * If the signature is incorrect, it sends an incorrect signature error to error channel.
381    ///
382    /// # Arguments
383    ///
384    /// * `metadata` - task metadata
385    /// * `aggregated_response_sender` - The sender channel for the aggregated responses
386    /// * `signatures_rx` - The receiver channel for the signed task responses
387    /// * `logger` - The logger to log messages.
388    async fn single_task_aggregator(
389        avs_registry_service: A,
390        metadata: TaskMetadata,
391        aggregated_response_sender: UnboundedSender<
392            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
393        >,
394        signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
395        logger: SharedLogger,
396    ) -> Result<(), BlsAggregationServiceError> {
397        let quorum_threshold_percentage_map: HashMap<u8, u8> = metadata
398            .quorum_numbers
399            .iter()
400            .enumerate()
401            .map(|(i, quorum_number)| (*quorum_number, metadata.quorum_threshold_percentages[i]))
402            .collect();
403        let operator_state_avs = avs_registry_service
404            .get_operators_avs_state_at_block(metadata.task_created_block, &metadata.quorum_numbers)
405            .await
406            .map_err(|_| BlsAggregationServiceError::RegistryError)?;
407        let quorums_avs_state = avs_registry_service
408            .get_quorums_avs_state_at_block(&metadata.quorum_numbers, metadata.task_created_block)
409            .await
410            .map_err(|_| BlsAggregationServiceError::RegistryError)?;
411        let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state
412            .iter()
413            .map(|(k, v)| (*k, v.total_stake))
414            .collect();
415
416        let quorum_apks_g1: Vec<BlsG1Point> = metadata
417            .quorum_numbers
418            .iter()
419            .filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
420            .map(|avs_state| avs_state.agg_pub_key_g1.clone())
421            .collect();
422
423        Self::loop_task_aggregator(
424            avs_registry_service,
425            metadata.task_index,
426            metadata.task_created_block,
427            metadata.time_to_expiry,
428            aggregated_response_sender,
429            signatures_rx,
430            operator_state_avs,
431            total_stake_per_quorum,
432            quorum_threshold_percentage_map,
433            quorum_apks_g1,
434            metadata.quorum_numbers,
435            metadata.window_duration,
436            logger,
437        )
438        .await
439    }
440
441    #[allow(clippy::too_many_arguments)]
442    async fn loop_task_aggregator(
443        avs_registry_service: A,
444        task_index: TaskIndex,
445        task_created_block: u64,
446        time_to_expiry: Duration,
447        aggregated_response_sender: UnboundedSender<
448            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
449        >,
450        mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
451        operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
452        total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
453        quorum_threshold_percentage_map: HashMap<u8, u8>,
454        quorum_apks_g1: Vec<BlsG1Point>,
455        quorum_nums: Vec<u8>,
456        window_duration: Duration,
457        logger: SharedLogger,
458    ) -> Result<(), BlsAggregationServiceError> {
459        let mut aggregated_operators: HashMap<FixedBytes<32>, AggregatedOperators> = HashMap::new();
460        let mut open_window = false;
461        let mut current_aggregated_response: Option<BlsAggregationServiceResponse> = None;
462        let (window_tx, mut window_rx) = tokio::sync::mpsc::unbounded_channel::<bool>();
463        let task_expired_timer = tokio::time::sleep(time_to_expiry);
464        tokio::pin!(task_expired_timer);
465
466        loop {
467            tokio::select! {
468                _ = &mut task_expired_timer => {
469                    // If the task is expired, send the aggregated response
470                    Self::handle_task_expired(
471                        &logger,
472                        &aggregated_response_sender,
473                        task_index,
474                        open_window,
475                        &current_aggregated_response,
476                    )?;
477                    return Ok(());
478                },
479                _ = window_rx.recv() => {
480                    // If the window is finished, send the aggregated response
481                    Self::handle_window_finished(
482                        &logger,
483                        &aggregated_response_sender,
484                        task_index,
485                        &current_aggregated_response,
486                    )?;
487                    return Ok(());
488                },
489                signed_task_digest = signatures_rx.recv() => {
490                    // If a new signature is received, handle it
491                    Self::handle_new_signature(
492                        &logger,
493                        &avs_registry_service,
494                        &mut aggregated_operators,
495                        &mut open_window,
496                        &mut current_aggregated_response,
497                        &window_tx,
498                        task_index,
499                        task_created_block,
500                        &operator_state_avs,
501                        &total_stake_per_quorum,
502                        &quorum_threshold_percentage_map,
503                        &quorum_apks_g1,
504                        &quorum_nums,
505                        window_duration,
506                        signed_task_digest,
507                    ).await?;
508                }
509            }
510        }
511    }
512
513    /// Handles a new signature in the [`loop_task_aggregator`] function.
514    ///
515    /// # Arguments
516    ///
517    /// * `logger` - The logger to log messages.
518    /// * `avs_registry_service` - The avs registry service.
519    /// * `aggregated_operators` - The aggregated operators.
520    /// * `open_window` - Whether the window is open.
521    /// * `current_aggregated_response` - The current aggregated response.
522    /// * `window_tx` - The window tx.
523    /// * `task_index` - The task index.
524    /// * `task_created_block` - The task created block.
525    /// * `operator_state_avs` - The operator state avs.
526    /// * `total_stake_per_quorum` - The total stake per quorum.
527    /// * `quorum_threshold_percentage_map` - The quorum threshold percentage map.
528    /// * `quorum_apks_g1` - The quorum apks g1.
529    /// * `quorum_nums` - The quorum numbers.
530    /// * `window_duration` - The window duration.
531    /// * `signed_task_digest` - The signed task digest.
532    #[allow(clippy::too_many_arguments)]
533    async fn handle_new_signature(
534        logger: &SharedLogger,
535        avs_registry_service: &A,
536        aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
537        open_window: &mut bool,
538        current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
539        window_tx: &UnboundedSender<bool>,
540        task_index: TaskIndex,
541        task_created_block: u64,
542        operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
543        total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
544        quorum_threshold_percentage_map: &HashMap<u8, u8>,
545        quorum_apks_g1: &[BlsG1Point],
546        quorum_nums: &[u8],
547        window_duration: Duration,
548        signed_task_digest: Option<SignedTaskResponseDigest>,
549    ) -> Result<(), BlsAggregationServiceError> {
550        logger.debug(
551            &format!("New signature received for task index: {}", task_index),
552            "eigen-services-blsaggregation.bls_agg.handle_new_signature",
553        );
554
555        let signed_digest =
556            signed_task_digest.ok_or(BlsAggregationServiceError::SignaturesChannelClosed)?;
557
558        // Verify if the operator has already signed for this digest
559        if Self::is_duplicate_signature(aggregated_operators, &signed_digest) {
560            signed_digest
561                .result_channel
562                .send(Err(BlsAggregationServiceError::SignatureVerificationError(
563                    SignatureVerificationError::DuplicateSignature,
564                )))
565                .map_err(|_| BlsAggregationServiceError::SenderError)?;
566            return Ok(());
567        }
568
569        // Verify the signature
570        let verification_result = verify_signature(
571            task_index,
572            &signed_digest,
573            operator_state_avs,
574            logger.clone(),
575        )
576        .await
577        .map_err(BlsAggregationServiceError::SignatureVerificationError);
578
579        let verification_has_error = verification_result.is_err();
580
581        // Send the verification result to the result channel
582        signed_digest
583            .result_channel
584            .send(verification_result)
585            .map_err(|_| BlsAggregationServiceError::SenderError)?;
586
587        // If the signature is incorrect, return
588        if verification_has_error {
589            return Ok(());
590        }
591
592        let operator_state = operator_state_avs.get(&signed_digest.operator_id).unwrap();
593
594        // Update the aggregated operators with the new operator info
595        let updated_aggregated = update_aggregated_operators(
596            aggregated_operators,
597            operator_state,
598            signed_digest.task_response_digest,
599            signed_digest.bls_signature,
600            signed_digest.operator_id,
601            logger.clone(),
602        );
603        aggregated_operators.insert(
604            signed_digest.task_response_digest,
605            updated_aggregated.clone(),
606        );
607
608        // Check if the stake thresholds are met. If not, return
609        if !Self::check_if_stake_thresholds_met(
610            &updated_aggregated.signers_total_stake_per_quorum,
611            total_stake_per_quorum,
612            quorum_threshold_percentage_map,
613        ) {
614            return Ok(());
615        }
616
617        logger.debug(
618            &format!("Signature threshold is met for task index: {}", task_index),
619            "eigen-services-blsaggregation.bls_agg.handle_new_signature",
620        );
621
622        // If the window is not open, open it
623        if !*open_window {
624            *open_window = true;
625            Self::start_window(window_tx, window_duration, task_index, logger.clone());
626        }
627
628        *current_aggregated_response = Some(
629            Self::build_aggregated_response(
630                task_index,
631                task_created_block,
632                signed_digest.task_response_digest,
633                operator_state_avs,
634                updated_aggregated,
635                avs_registry_service,
636                quorum_apks_g1,
637                quorum_nums,
638                logger.clone(),
639            )
640            .await?,
641        );
642
643        Ok(())
644    }
645
646    /// Handles when the task expired in the [`loop_task_aggregator`] function.
647    /// If the window is open, send the aggregated response. Else, send the error.
648    ///
649    /// # Arguments
650    ///
651    /// * `logger` - The logger to log messages.
652    /// * `aggregated_response_sender` - The aggregated response sender.
653    /// * `task_index` - The task index.
654    /// * `open_window` - Whether the window is open.
655    /// * `current_aggregated_response` - The current aggregated response.
656    fn handle_task_expired(
657        logger: &SharedLogger,
658        aggregated_response_sender: &UnboundedSender<
659            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
660        >,
661        task_index: TaskIndex,
662        open_window: bool,
663        current_aggregated_response: &Option<BlsAggregationServiceResponse>,
664    ) -> Result<(), BlsAggregationServiceError> {
665        if open_window {
666            logger.debug(
667                &format!(
668                    "task_expired_timer while in the waiting window for task index: {}",
669                    task_index
670                ),
671                "eigen-services-blsaggregation.bls_agg.handle_task_expired",
672            );
673            aggregated_response_sender
674                .send(Ok(current_aggregated_response.clone().unwrap()))
675                .map_err(|_| BlsAggregationServiceError::SenderError)?;
676        } else {
677            logger.debug(
678                &format!(
679                    "task_expired_timer NOT in the waiting window for task index: {}",
680                    task_index
681                ),
682                "eigen-services-blsaggregation.bls_agg.handle_task_expired",
683            );
684
685            let _ = aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired));
686        }
687        Ok(())
688    }
689
690    /// Handles when the window is finished in the [`loop_task_aggregator`] function.
691    ///
692    ///
693    /// # Arguments
694    ///
695    /// * `logger` - The logger to log messages.
696    /// * `aggregated_response_sender` - The aggregated response sender.
697    /// * `task_index` - The task index.
698    /// * `current_aggregated_response` - The current aggregated response.
699    fn handle_window_finished(
700        logger: &SharedLogger,
701        aggregated_response_sender: &UnboundedSender<
702            Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
703        >,
704        task_index: TaskIndex,
705        current_aggregated_response: &Option<BlsAggregationServiceResponse>,
706    ) -> Result<(), BlsAggregationServiceError> {
707        logger.debug(
708            &format!(
709                "Window finished. Send aggregated response for task index: {}",
710                task_index
711            ),
712            "eigen-services-blsaggregation.bls_agg.handle_window_finished",
713        );
714
715        aggregated_response_sender
716            .send(Ok(current_aggregated_response.clone().unwrap()))
717            .map_err(|_| BlsAggregationServiceError::SenderError)?;
718        Ok(())
719    }
720
721    /// Builds the aggregated response containing all the aggregation info.
722    ///
723    /// # Arguments
724    ///
725    /// * `task_index` - The index of the task.
726    /// * `task_created_block` - The block in which the task was created.
727    /// * `signed_task_digest` - The signed task.
728    /// * `operator_state_avs` - A hashmap with the operator state per operator id.
729    /// * `digest_aggregated_operators` - The aggregated operators.
730    /// * `avs_registry_service` - The avs registry service.
731    /// * `quorum_apks_g1` - The quorum aggregated public keys.
732    /// * `quorum_nums` - The quorum numbers.
733    /// * `logger` - The logger to log messages.
734    ///
735    /// # Returns
736    ///
737    /// The BLS aggregation service response.
738    #[allow(clippy::too_many_arguments)]
739    async fn build_aggregated_response(
740        task_index: TaskIndex,
741        task_created_block: u64,
742        task_response_digest: FixedBytes<32>,
743        operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
744        digest_aggregated_operators: AggregatedOperators,
745        avs_registry_service: &A,
746        quorum_apks_g1: &[BlsG1Point],
747        quorum_nums: &[u8],
748        logger: SharedLogger,
749    ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
750        logger.debug(
751            &format!("Build aggregated response for task index: {}", task_index),
752            "eigen-services-blsaggregation.bls_agg.build_aggregated_response",
753        );
754
755        let mut non_signers_operators_ids: Vec<FixedBytes<32>> = operator_state_avs
756            .keys()
757            .filter(|operator_id| {
758                !digest_aggregated_operators
759                    .signers_operator_ids_set
760                    .contains_key(*operator_id)
761            })
762            .cloned()
763            .collect();
764
765        non_signers_operators_ids.sort();
766
767        let non_signers_pub_keys_g1: Vec<BlsG1Point> = non_signers_operators_ids
768            .iter()
769            .filter_map(|operator_id| operator_state_avs.get(operator_id))
770            .filter_map(|operator_avs_state| operator_avs_state.operator_info.pub_keys.clone())
771            .map(|pub_keys| pub_keys.g1_pub_key)
772            .collect();
773
774        let indices = avs_registry_service
775            .get_check_signatures_indices(
776                task_created_block,
777                quorum_nums.into(),
778                non_signers_operators_ids,
779            )
780            .await
781            .map_err(|_err| BlsAggregationServiceError::RegistryError)?;
782
783        Ok(BlsAggregationServiceResponse {
784            task_index,
785            task_response_digest,
786            non_signers_pub_keys_g1,
787            quorum_apks_g1: quorum_apks_g1.into(),
788            signers_apk_g2: digest_aggregated_operators.signers_apk_g2,
789            signers_agg_sig_g1: digest_aggregated_operators.signers_agg_sig_g1,
790            non_signer_quorum_bitmap_indices: indices.clone().nonSignerQuorumBitmapIndices,
791            quorum_apk_indices: indices.quorumApkIndices,
792            total_stake_indices: indices.totalStakeIndices,
793            non_signer_stake_indices: indices.nonSignerStakeIndices,
794        })
795    }
796
797    /// Checks if the stake thresholds are met for the given set of quorum members.
798    ///
799    /// # Arguments
800    ///
801    /// * `signed_stake_per_quorum` - The signed stake per quorum.
802    /// * `total_stake_per_quorum` - The total stake per quorum.
803    /// * `quorum_threshold_percentages_map` - The quorum threshold percentages map,
804    ///   containing the quorum id as a key and its corresponding quorum threshold percentage.
805    ///
806    /// # Returns
807    ///
808    /// Returns `true` if the stake thresholds are met for all the members, otherwise `false`.
809    fn check_if_stake_thresholds_met(
810        signed_stake_per_quorum: &HashMap<u8, U256>,
811        total_stake_per_quorum: &HashMap<u8, U256>,
812        quorum_threshold_percentages_map: &HashMap<u8, QuorumThresholdPercentage>,
813    ) -> bool {
814        for (quorum_num, quorum_threshold_percentage) in quorum_threshold_percentages_map {
815            let (Some(signed_stake_by_quorum), Some(total_stake_by_quorum)) = (
816                signed_stake_per_quorum.get(quorum_num),
817                total_stake_per_quorum.get(quorum_num),
818            ) else {
819                return false;
820            };
821
822            let signed_stake = signed_stake_by_quorum * U256::from(100);
823            let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage);
824
825            if signed_stake < threshold_stake {
826                return false;
827            }
828        }
829        true
830    }
831
832    /// Checks if the signature is a duplicate.
833    ///
834    /// # Arguments
835    ///
836    /// * `aggregated_operators` - The aggregated operators.
837    /// * `signed_digest` - The signed task response digest.
838    ///
839    /// # Returns
840    ///
841    /// Returns `true` if the signature is a duplicate, otherwise `false`.
842    fn is_duplicate_signature(
843        aggregated_operators: &HashMap<FixedBytes<32>, AggregatedOperators>,
844        signed_digest: &SignedTaskResponseDigest,
845    ) -> bool {
846        aggregated_operators
847            .get(&signed_digest.task_response_digest)
848            .map(|ops| {
849                ops.signers_operator_ids_set
850                    .contains_key(&signed_digest.operator_id)
851            })
852            .unwrap_or(false)
853    }
854
855    /// Starts the window to wait for new signatures.
856    ///
857    /// # Arguments
858    ///
859    /// * `window_tx` - The unbounded sender to send the window signal.
860    /// * `window_duration` - The duration of the window.
861    /// * `task_index` - The task index.
862    /// * `logger` - The logger to log messages.
863    fn start_window(
864        window_tx: &UnboundedSender<bool>,
865        window_duration: Duration,
866        task_index: TaskIndex,
867        logger: SharedLogger,
868    ) {
869        let sender = window_tx.clone();
870        logger.debug(
871            &format!(
872                "Create window to wait for new signatures for task index: {}",
873                task_index
874            ),
875            "eigen-services-blsaggregation.bls_agg.start_window",
876        );
877        tokio::spawn(async move {
878            tokio::time::sleep(window_duration).await;
879            let _ = sender.send(true);
880        });
881    }
882}
883
884/// Verifies the signature of the task response given a `operator_avs_state`.
885/// If the signature is correct, it returns `Ok(())`, otherwise it returns an error.
886///
887/// # Arguments
888///
889/// * `task_index` - The index of the task
890/// * `signed_task_response_digest` - The signed task response digest
891/// * `operator_avs_state` - A hashmap containing the staked of all the operator indexed by operator_id.
892///   This is used to get the `operator_state` to obtain the operator public key.
893/// * `logger` - The logger to log messages.
894///
895/// # Error
896///
897/// Returns error:
898/// - `SignatureVerificationError::OperatorNotFound` if the operator is not found,
899/// - `SignatureVerificationError::OperatorPublicKeyNotFound` if the operator public key is not found,
900/// - `SignatureVerificationError::IncorrectSignature` if the signature is incorrect.
901async fn verify_signature(
902    task_index: TaskIndex,
903    signed_task_response_digest: &SignedTaskResponseDigest,
904    operator_avs_state: &HashMap<FixedBytes<32>, OperatorAvsState>,
905    logger: SharedLogger,
906) -> Result<(), SignatureVerificationError> {
907    let Some(operator_state) = operator_avs_state.get(&signed_task_response_digest.operator_id)
908    else {
909        logger.error(
910            &format!("Operator Not Found for task index: {}", task_index),
911            "eigen-services-blsaggregation.bls_agg.verify_signature",
912        );
913        return Err(SignatureVerificationError::OperatorNotFound);
914    };
915
916    let Some(pub_keys) = &operator_state.operator_info.pub_keys else {
917        logger.error(
918            &format!(
919                "Operator Public Key Not Found for task index: {}",
920                task_index
921            ),
922            "eigen-services-blsaggregation.bls_agg.verify_signature",
923        );
924        return Err(SignatureVerificationError::OperatorPublicKeyNotFound);
925    };
926
927    let message = signed_task_response_digest
928        .task_response_digest
929        .as_slice()
930        .try_into()
931        .map_err(|_| SignatureVerificationError::IncorrectSignature)?;
932
933    verify_message(
934        pub_keys.g2_pub_key.g2(),
935        message,
936        signed_task_response_digest.bls_signature.g1_point().g1(),
937    )
938    .then_some(())
939    .ok_or(SignatureVerificationError::IncorrectSignature)
940    .inspect(|_| {
941        logger.debug(
942            &format!(
943                "Signature verification successful for task index: {}",
944                task_index
945            ),
946            "eigen-services-blsaggregation.bls_agg.verify_signature",
947        );
948    })
949    .inspect_err(|_| {
950        logger.error(
951            &format!(
952                "Signature verification failed for task index: {}",
953                task_index
954            ),
955            "eigen-services-blsaggregation.bls_agg.verify_signature",
956        );
957    })
958}
959
960/// Updates the aggregated operators with the new operator info.
961///
962/// # Arguments
963///
964/// * `aggregated_operators` - The aggregated operators.
965/// * `operator_state` - The operator state.
966/// * `task_response_digest` - The task response digest.
967/// * `bls_signature` - The BLS signature.
968/// * `operator_id` - The operator id.
969/// * `logger` - The logger to log messages.
970///
971/// # Returns
972///
973/// The updated aggregated operators.
974fn update_aggregated_operators(
975    aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
976    operator_state: &OperatorAvsState,
977    task_response_digest: FixedBytes<32>,
978    bls_signature: Signature,
979    operator_id: FixedBytes<32>,
980    logger: SharedLogger,
981) -> AggregatedOperators {
982    logger.debug(
983        "Update aggregated operators",
984        "eigen-services-blsaggregation.bls_agg.update_aggregated_operators",
985    );
986
987    let bls_signature_g1_point = bls_signature.g1_point().g1();
988
989    if let Some(existing) = aggregated_operators.get_mut(&task_response_digest) {
990        // If the operator is already in the aggregated operators, aggregate the new operator
991        let updated = aggregate_new_operator(
992            existing,
993            operator_state.clone(),
994            operator_id,
995            bls_signature_g1_point,
996            logger,
997        );
998        updated.clone()
999    } else {
1000        // If the operator is not in the aggregated operators, create a new aggregated operator
1001        let operator_g2_pubkey = operator_state
1002            .operator_info
1003            .pub_keys
1004            .clone()
1005            .unwrap()
1006            .g2_pub_key
1007            .g2();
1008        let mut signers_apk_g2 = BlsG2Point::new(G2Affine::zero());
1009        let mut signers_agg_sig_g1 = Signature::new(G1Affine::zero());
1010        for _ in 0..operator_state.stake_per_quorum.len() {
1011            signers_apk_g2 = BlsG2Point::new((signers_apk_g2.g2() + operator_g2_pubkey).into());
1012            signers_agg_sig_g1 = Signature::new(
1013                (signers_agg_sig_g1.g1_point().g1() + bls_signature_g1_point).into(),
1014            );
1015        }
1016        AggregatedOperators {
1017            signers_apk_g2,
1018            signers_agg_sig_g1,
1019            signers_operator_ids_set: HashMap::from([(operator_state.operator_id, true)]),
1020            signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(),
1021        }
1022    }
1023}
1024
1025/// Adds a new operator to the aggregated operators by aggregating its public key, signature and stake.
1026///
1027/// # Arguments
1028///
1029/// - `aggregated_operators` - Contains the information of all the aggregated operators.
1030/// - `operator_state` - The state of the operator, contains information about its stake.
1031/// - `signed_task_digest` - Contains the id and signature of the new operator.
1032/// - `logger` - The logger to log messages.
1033///
1034/// # Returns
1035///
1036/// The given aggregated operators, aggregated with the new operator info.
1037fn aggregate_new_operator(
1038    aggregated_operators: &mut AggregatedOperators,
1039    operator_state: OperatorAvsState,
1040    operator_id: FixedBytes<32>,
1041    signature_g1_point: G1Affine,
1042    logger: SharedLogger,
1043) -> &mut AggregatedOperators {
1044    let operator_g2_pubkey = operator_state
1045        .operator_info
1046        .pub_keys
1047        .clone()
1048        .unwrap()
1049        .g2_pub_key
1050        .g2();
1051    aggregated_operators
1052        .signers_operator_ids_set
1053        .insert(operator_id, true);
1054
1055    logger.debug(
1056        &format!(
1057            "operator {} inserted in signers_operator_ids_set",
1058            operator_id
1059        ),
1060        "eigen-services-blsaggregation.bls_agg.aggregate_new_operator",
1061    );
1062
1063    for (quorum_num, stake) in operator_state.stake_per_quorum.iter() {
1064        // For each quorum the operator has stake in, we aggregate the signature and update the stake
1065        aggregated_operators.signers_agg_sig_g1 = Signature::new(
1066            (aggregated_operators.signers_agg_sig_g1.g1_point().g1() + signature_g1_point).into(),
1067        );
1068        aggregated_operators.signers_apk_g2 =
1069            BlsG2Point::new((aggregated_operators.signers_apk_g2.g2() + operator_g2_pubkey).into());
1070        aggregated_operators
1071            .signers_total_stake_per_quorum
1072            .entry(*quorum_num)
1073            .and_modify(|v| *v += stake)
1074            .or_insert(*stake);
1075    }
1076    aggregated_operators
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService};
1082    use crate::bls_agg::{TaskMetadata, TaskSignature};
1083    use alloy::primitives::{B256, U256};
1084    use eigen_crypto_bls::{BlsG1Point, BlsG2Point, BlsKeyPair, Signature};
1085    use eigen_logging::get_test_logger;
1086    use eigen_services_avsregistry::fake_avs_registry_service::FakeAvsRegistryService;
1087    use eigen_types::avs::SignatureVerificationError::{DuplicateSignature, IncorrectSignature};
1088    use eigen_types::operator::{QuorumNum, QuorumThresholdPercentages};
1089    use eigen_types::{avs::TaskIndex, test::TestOperator};
1090    use sha2::{Digest, Sha256};
1091    use std::collections::HashMap;
1092    use std::time::Duration;
1093    use std::vec;
1094    use tokio::time::{sleep, Instant};
1095
1096    const PRIVATE_KEY_1: &str =
1097        "13710126902690889134622698668747132666439281256983827313388062967626731803599";
1098    const PRIVATE_KEY_2: &str =
1099        "14610126902690889134622698668747132666439281256983827313388062967626731803500";
1100    const PRIVATE_KEY_3: &str =
1101        "15610126902690889134622698668747132666439281256983827313388062967626731803501";
1102
1103    fn hash(task_response: u64) -> B256 {
1104        let mut hasher = Sha256::new();
1105        hasher.update(task_response.to_be_bytes());
1106        B256::from_slice(hasher.finalize().as_ref())
1107    }
1108
1109    fn aggregate_g1_public_keys(operators: &[TestOperator]) -> BlsG1Point {
1110        operators
1111            .iter()
1112            .map(|op| op.bls_keypair.public_key().g1())
1113            .reduce(|a, b| (a + b).into())
1114            .map(BlsG1Point::new)
1115            .unwrap()
1116    }
1117
1118    fn aggregate_g2_public_keys(operators: &[TestOperator]) -> BlsG2Point {
1119        operators
1120            .iter()
1121            .map(|op| op.bls_keypair.public_key_g2().g2())
1122            .reduce(|a, b| (a + b).into())
1123            .map(BlsG2Point::new)
1124            .unwrap()
1125    }
1126
1127    fn aggregate_g1_signatures(signatures: &[Signature]) -> Signature {
1128        let agg = signatures
1129            .iter()
1130            .map(|s| s.g1_point().g1())
1131            .reduce(|a, b| (a + b).into())
1132            .unwrap();
1133        Signature::new(agg)
1134    }
1135
1136    #[tokio::test]
1137    async fn test_1_quorum_1_operator_1_correct_signature() {
1138        let test_operator_1 = TestOperator {
1139            operator_id: U256::from(1).into(),
1140            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1141            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1142        };
1143
1144        let block_number = 1;
1145        let task_index: TaskIndex = 0;
1146        let quorum_numbers = vec![0];
1147        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1148        let time_to_expiry = Duration::from_secs(1);
1149        let task_response = 123; // Initialize with appropriate data
1150
1151        let task_response_digest = hash(task_response);
1152        let bls_signature = test_operator_1
1153            .bls_keypair
1154            .sign_message(task_response_digest.as_ref());
1155        let fake_avs_registry_service =
1156            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1157        let bls_agg_service =
1158            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1159        let metadata = TaskMetadata::new(
1160            task_index,
1161            block_number,
1162            quorum_numbers,
1163            quorum_threshold_percentages,
1164            time_to_expiry,
1165        );
1166        let (handle, mut aggregator_response) = bls_agg_service.start();
1167        handle.initialize_task(metadata).await.unwrap();
1168
1169        handle
1170            .process_signature(TaskSignature::new(
1171                task_index,
1172                task_response_digest,
1173                bls_signature,
1174                test_operator_1.operator_id,
1175            ))
1176            .await
1177            .unwrap();
1178
1179        let expected_agg_service_response = BlsAggregationServiceResponse {
1180            task_index,
1181            task_response_digest,
1182            non_signers_pub_keys_g1: vec![],
1183            quorum_apks_g1: vec![test_operator_1.bls_keypair.public_key()],
1184            signers_apk_g2: test_operator_1.bls_keypair.public_key_g2(),
1185            signers_agg_sig_g1: test_operator_1
1186                .bls_keypair
1187                .sign_message(task_response_digest.as_ref()),
1188            non_signer_quorum_bitmap_indices: vec![],
1189            quorum_apk_indices: vec![],
1190            total_stake_indices: vec![],
1191            non_signer_stake_indices: vec![],
1192        };
1193
1194        let response = aggregator_response.receive_aggregated_response().await;
1195
1196        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1197        assert_eq!(task_index, response.unwrap().task_index);
1198    }
1199
1200    #[tokio::test]
1201    async fn test_1_quorum_2_operator_2_duplicated_signatures() {
1202        let test_operator_1 = TestOperator {
1203            operator_id: U256::from(1).into(),
1204            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1205            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1206        };
1207        let test_operator_2 = TestOperator {
1208            operator_id: U256::from(2).into(),
1209            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1210            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1211        };
1212        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1213        let block_number = 1;
1214        let task_index: TaskIndex = 0;
1215        let quorum_numbers = vec![0];
1216        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1217        let time_to_expiry = Duration::from_secs(1);
1218        let task_response = 123; // Initialize with appropriate data
1219        let task_response_digest = hash(task_response);
1220
1221        let fake_avs_registry_service =
1222            FakeAvsRegistryService::new(block_number, test_operators.clone());
1223        let bls_agg_service =
1224            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1225        let metadata = TaskMetadata::new(
1226            task_index,
1227            block_number,
1228            quorum_numbers,
1229            quorum_threshold_percentages,
1230            time_to_expiry,
1231        );
1232        let (handle, mut aggregator_response) = bls_agg_service.start();
1233        handle.initialize_task(metadata).await.unwrap();
1234        let bls_signature_1 = test_operator_1
1235            .bls_keypair
1236            .sign_message(task_response_digest.as_ref());
1237        handle
1238            .process_signature(TaskSignature::new(
1239                task_index,
1240                task_response_digest,
1241                bls_signature_1.clone(),
1242                test_operator_1.operator_id,
1243            ))
1244            .await
1245            .unwrap();
1246
1247        let second_signature_processing_result = handle
1248            .process_signature(TaskSignature::new(
1249                task_index,
1250                task_response_digest,
1251                bls_signature_1.clone(),
1252                test_operator_1.operator_id,
1253            ))
1254            .await;
1255
1256        assert_eq!(
1257            second_signature_processing_result,
1258            Err(BlsAggregationServiceError::SignatureVerificationError(
1259                DuplicateSignature
1260            ))
1261        );
1262
1263        let bls_signature_2 = test_operator_2
1264            .bls_keypair
1265            .sign_message(task_response_digest.as_ref());
1266
1267        handle
1268            .process_signature(TaskSignature::new(
1269                task_index,
1270                task_response_digest,
1271                bls_signature_2.clone(),
1272                test_operator_2.operator_id,
1273            ))
1274            .await
1275            .unwrap();
1276
1277        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1278        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1279        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_signature_1, bls_signature_2]);
1280        let expected_agg_service_response = BlsAggregationServiceResponse {
1281            task_index,
1282            task_response_digest,
1283            non_signers_pub_keys_g1: vec![],
1284            quorum_apks_g1: vec![quorum_apks_g1],
1285            signers_apk_g2,
1286            signers_agg_sig_g1,
1287            non_signer_quorum_bitmap_indices: vec![],
1288            quorum_apk_indices: vec![],
1289            total_stake_indices: vec![],
1290            non_signer_stake_indices: vec![],
1291        };
1292
1293        let response = aggregator_response.receive_aggregated_response().await;
1294
1295        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1296        assert_eq!(task_index, response.unwrap().task_index);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_1_quorum_3_operator_3_correct_signatures() {
1301        let test_operator_1 = TestOperator {
1302            operator_id: U256::from(1).into(),
1303            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1304            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1305        };
1306        let test_operator_2 = TestOperator {
1307            operator_id: U256::from(2).into(),
1308            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1309            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1310        };
1311        let test_operator_3 = TestOperator {
1312            operator_id: U256::from(3).into(),
1313            stake_per_quorum: HashMap::from([(0u8, U256::from(300)), (1u8, U256::from(100))]),
1314            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1315        };
1316        let test_operators = vec![
1317            test_operator_1.clone(),
1318            test_operator_2.clone(),
1319            test_operator_3.clone(),
1320        ];
1321
1322        let block_number = 1;
1323        let task_index = 0;
1324        let quorum_numbers: Vec<QuorumNum> = vec![0];
1325        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
1326        let time_to_expiry = Duration::from_secs(1);
1327        let task_response = 123; // Initialize with appropriate data
1328        let task_response_digest = hash(task_response);
1329
1330        let fake_avs_registry_service =
1331            FakeAvsRegistryService::new(block_number, test_operators.clone());
1332        let bls_agg_service =
1333            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1334
1335        let metadata = TaskMetadata::new(
1336            task_index,
1337            block_number,
1338            quorum_numbers,
1339            quorum_threshold_percentages,
1340            time_to_expiry,
1341        );
1342        let (handle, mut aggregator_response) = bls_agg_service.start();
1343        handle.initialize_task(metadata).await.unwrap();
1344
1345        let bls_sig_op_1 = test_operator_1
1346            .bls_keypair
1347            .sign_message(task_response_digest.as_ref());
1348        handle
1349            .process_signature(TaskSignature::new(
1350                task_index,
1351                task_response_digest,
1352                bls_sig_op_1.clone(),
1353                test_operator_1.operator_id,
1354            ))
1355            .await
1356            .unwrap();
1357
1358        let bls_sig_op_2 = test_operator_2
1359            .bls_keypair
1360            .sign_message(task_response_digest.as_ref());
1361        handle
1362            .process_signature(TaskSignature::new(
1363                task_index,
1364                task_response_digest,
1365                bls_sig_op_2.clone(),
1366                test_operator_2.operator_id,
1367            ))
1368            .await
1369            .unwrap();
1370
1371        let bls_sig_op_3 = test_operator_3
1372            .bls_keypair
1373            .sign_message(task_response_digest.as_ref());
1374        handle
1375            .process_signature(TaskSignature::new(
1376                task_index,
1377                task_response_digest,
1378                bls_sig_op_3.clone(),
1379                test_operator_3.operator_id,
1380            ))
1381            .await
1382            .unwrap();
1383
1384        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1385        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1386        let signers_agg_sig_g1 =
1387            aggregate_g1_signatures(&vec![bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
1388
1389        let expected_agg_service_response = BlsAggregationServiceResponse {
1390            task_index,
1391            task_response_digest,
1392            non_signers_pub_keys_g1: vec![],
1393            quorum_apks_g1: vec![quorum_apks_g1],
1394            signers_apk_g2,
1395            signers_agg_sig_g1,
1396            non_signer_quorum_bitmap_indices: vec![],
1397            quorum_apk_indices: vec![],
1398            total_stake_indices: vec![],
1399            non_signer_stake_indices: vec![],
1400        };
1401
1402        let response = aggregator_response.receive_aggregated_response().await;
1403        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1404        assert_eq!(task_index, response.unwrap().task_index);
1405    }
1406
1407    #[tokio::test]
1408    async fn test_2_quorum_2_operator_2_correct_signatures() {
1409        let test_operator_1 = TestOperator {
1410            operator_id: U256::from(1).into(),
1411            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1412            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1413        };
1414        let test_operator_2 = TestOperator {
1415            operator_id: U256::from(2).into(),
1416            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1417            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1418        };
1419        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1420        let block_number = 1;
1421        let task_index = 0;
1422        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1423        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1424        let time_to_expiry = Duration::from_secs(1);
1425        let task_response = 123; // Initialize with appropriate data
1426        let task_response_digest = hash(task_response);
1427
1428        let fake_avs_registry_service =
1429            FakeAvsRegistryService::new(block_number, test_operators.clone());
1430        let bls_agg_service =
1431            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1432
1433        let metadata = TaskMetadata::new(
1434            task_index,
1435            block_number,
1436            quorum_numbers,
1437            quorum_threshold_percentages,
1438            time_to_expiry,
1439        );
1440        let (handle, mut aggregator_response) = bls_agg_service.start();
1441        handle.initialize_task(metadata).await.unwrap();
1442
1443        let bls_sig_op_1 = test_operator_1
1444            .bls_keypair
1445            .sign_message(task_response_digest.as_ref());
1446        handle
1447            .process_signature(TaskSignature::new(
1448                task_index,
1449                task_response_digest,
1450                bls_sig_op_1.clone(),
1451                test_operator_1.operator_id,
1452            ))
1453            .await
1454            .unwrap();
1455
1456        let bls_sig_op_2 = test_operator_2
1457            .bls_keypair
1458            .sign_message(task_response_digest.as_ref());
1459        handle
1460            .process_signature(TaskSignature::new(
1461                task_index,
1462                task_response_digest,
1463                bls_sig_op_2.clone(),
1464                test_operator_2.operator_id,
1465            ))
1466            .await
1467            .unwrap();
1468
1469        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1470        let signers_apk_g2 =
1471            aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1472        let signers_agg_sig_g1 = aggregate_g1_signatures(&[
1473            bls_sig_op_1.clone(),
1474            bls_sig_op_1,
1475            bls_sig_op_2.clone(),
1476            bls_sig_op_2,
1477        ]);
1478
1479        let expected_agg_service_response = BlsAggregationServiceResponse {
1480            task_index,
1481            task_response_digest,
1482            non_signers_pub_keys_g1: vec![],
1483            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1],
1484            signers_apk_g2,
1485            signers_agg_sig_g1,
1486            non_signer_quorum_bitmap_indices: vec![],
1487            quorum_apk_indices: vec![],
1488            total_stake_indices: vec![],
1489            non_signer_stake_indices: vec![],
1490        };
1491
1492        let response = aggregator_response.receive_aggregated_response().await;
1493
1494        assert_eq!(expected_agg_service_response, response.unwrap());
1495    }
1496
1497    #[tokio::test]
1498    async fn test_2_concurrent_tasks_2_quorum_2_operator_2_correct_signatures() {
1499        let test_operator_1 = TestOperator {
1500            operator_id: U256::from(1).into(),
1501            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1502            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1503        };
1504        let test_operator_2 = TestOperator {
1505            operator_id: U256::from(2).into(),
1506            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1507            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1508        };
1509        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1510        let block_number = 1;
1511        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1512        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1513        let time_to_expiry = Duration::from_secs(1);
1514
1515        let fake_avs_registry_service =
1516            FakeAvsRegistryService::new(block_number, test_operators.clone());
1517        let bls_agg_service =
1518            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1519
1520        // initialize 2 concurrent tasks
1521        let task_1_index = 1;
1522        let task_1_response = 123; // Initialize with appropriate data
1523        let task_1_response_digest = hash(task_1_response);
1524        let metadata1 = TaskMetadata::new(
1525            task_1_index,
1526            block_number,
1527            quorum_numbers.clone(),
1528            quorum_threshold_percentages.clone(),
1529            time_to_expiry,
1530        );
1531        let (handle, mut aggregator_response) = bls_agg_service.start();
1532        handle.initialize_task(metadata1).await.unwrap();
1533
1534        let task_2_index = 2;
1535        let task_2_response = 234; // Initialize with appropriate data
1536        let task_2_response_digest = hash(task_2_response);
1537        let metadata2 = TaskMetadata::new(
1538            task_2_index,
1539            block_number,
1540            quorum_numbers,
1541            quorum_threshold_percentages,
1542            time_to_expiry,
1543        );
1544        handle.initialize_task(metadata2).await.unwrap();
1545
1546        let bls_sig_task_1_op_1 = test_operator_1
1547            .bls_keypair
1548            .sign_message(task_1_response_digest.as_ref());
1549        handle
1550            .process_signature(TaskSignature::new(
1551                task_1_index,
1552                task_1_response_digest,
1553                bls_sig_task_1_op_1.clone(),
1554                test_operator_1.operator_id,
1555            ))
1556            .await
1557            .unwrap();
1558
1559        let bls_sig_task_1_op_2 = test_operator_2
1560            .bls_keypair
1561            .sign_message(task_1_response_digest.as_ref());
1562        handle
1563            .process_signature(TaskSignature::new(
1564                task_1_index,
1565                task_1_response_digest,
1566                bls_sig_task_1_op_2.clone(),
1567                test_operator_2.operator_id,
1568            ))
1569            .await
1570            .unwrap();
1571
1572        let bls_sig_task_2_op_1 = test_operator_1
1573            .bls_keypair
1574            .sign_message(task_2_response_digest.as_ref());
1575        handle
1576            .process_signature(TaskSignature::new(
1577                task_2_index,
1578                task_2_response_digest,
1579                bls_sig_task_2_op_1.clone(),
1580                test_operator_1.operator_id,
1581            ))
1582            .await
1583            .unwrap();
1584
1585        let bls_sig_task_2_op_2 = test_operator_2
1586            .bls_keypair
1587            .sign_message(task_2_response_digest.as_ref());
1588        handle
1589            .process_signature(TaskSignature::new(
1590                task_2_index,
1591                task_2_response_digest,
1592                bls_sig_task_2_op_2.clone(),
1593                test_operator_2.operator_id,
1594            ))
1595            .await
1596            .unwrap();
1597
1598        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1599        let signers_apk_g2 =
1600            aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1601        let signers_agg_sig_g1_task_1 = aggregate_g1_signatures(&[
1602            bls_sig_task_1_op_1.clone(),
1603            bls_sig_task_1_op_1,
1604            bls_sig_task_1_op_2.clone(),
1605            bls_sig_task_1_op_2,
1606        ]);
1607
1608        let expected_response_task_1 = BlsAggregationServiceResponse {
1609            task_index: task_1_index,
1610            task_response_digest: task_1_response_digest,
1611            non_signers_pub_keys_g1: vec![],
1612            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1613            signers_apk_g2: signers_apk_g2.clone(),
1614            signers_agg_sig_g1: signers_agg_sig_g1_task_1,
1615            non_signer_quorum_bitmap_indices: vec![],
1616            quorum_apk_indices: vec![],
1617            total_stake_indices: vec![],
1618            non_signer_stake_indices: vec![],
1619        };
1620
1621        let signers_agg_sig_g1_task_2 = aggregate_g1_signatures(&[
1622            bls_sig_task_2_op_1.clone(),
1623            bls_sig_task_2_op_1,
1624            bls_sig_task_2_op_2.clone(),
1625            bls_sig_task_2_op_2,
1626        ]);
1627
1628        let expected_response_task_2 = BlsAggregationServiceResponse {
1629            task_index: task_2_index,
1630            task_response_digest: task_2_response_digest,
1631            non_signers_pub_keys_g1: vec![],
1632            quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1633            signers_apk_g2,
1634            signers_agg_sig_g1: signers_agg_sig_g1_task_2,
1635            non_signer_quorum_bitmap_indices: vec![],
1636            quorum_apk_indices: vec![],
1637            total_stake_indices: vec![],
1638            non_signer_stake_indices: vec![],
1639        };
1640
1641        let first_response = aggregator_response.receive_aggregated_response().await;
1642        let second_response = aggregator_response.receive_aggregated_response().await;
1643
1644        let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1
1645        {
1646            (first_response, second_response)
1647        } else {
1648            (second_response, first_response)
1649        };
1650
1651        assert_eq!(expected_response_task_1, task_1_response.unwrap());
1652        assert_eq!(expected_response_task_2, task_2_response.unwrap());
1653    }
1654
1655    #[tokio::test]
1656    async fn test_1_quorum_1_operator_0_signatures_task_expired() {
1657        let test_operator_1 = TestOperator {
1658            operator_id: U256::from(1).into(),
1659            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1660            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1661        };
1662
1663        let block_number = 1;
1664        let task_index: TaskIndex = 0;
1665        let quorum_numbers = vec![0];
1666        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1667        let time_to_expiry = Duration::from_secs(1);
1668        let _task_response = 123; // Initialize with appropriate data
1669
1670        let fake_avs_registry_service =
1671            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1672        let bls_agg_service =
1673            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1674        let metadata = TaskMetadata::new(
1675            task_index,
1676            block_number,
1677            quorum_numbers,
1678            quorum_threshold_percentages,
1679            time_to_expiry,
1680        );
1681        let (handle, mut aggregator_response) = bls_agg_service.start();
1682        handle.initialize_task(metadata).await.unwrap();
1683
1684        let response = aggregator_response.receive_aggregated_response().await;
1685
1686        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1687    }
1688
1689    #[tokio::test]
1690    async fn test_1_quorum_2_operator_1_signatures_50_threshold() {
1691        let test_operator_1 = TestOperator {
1692            operator_id: U256::from(1).into(),
1693            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1694            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1695        };
1696        let test_operator_2 = TestOperator {
1697            operator_id: U256::from(2).into(),
1698            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1699            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1700        };
1701        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1702        let block_number = 1;
1703        let task_index = 0;
1704        let quorum_numbers: Vec<QuorumNum> = vec![0];
1705        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8];
1706        let time_to_expiry = Duration::from_secs(1);
1707        let task_response = 123; // Initialize with appropriate data
1708        let task_response_digest = hash(task_response);
1709        let bls_sig_op_1 = test_operator_1
1710            .bls_keypair
1711            .sign_message(task_response_digest.as_ref());
1712
1713        let fake_avs_registry_service =
1714            FakeAvsRegistryService::new(block_number, test_operators.clone());
1715        let bls_agg_service =
1716            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1717
1718        let metadata = TaskMetadata::new(
1719            task_index,
1720            block_number,
1721            quorum_numbers,
1722            quorum_threshold_percentages,
1723            time_to_expiry,
1724        );
1725        let (handle, mut aggregator_response) = bls_agg_service.start();
1726        handle.initialize_task(metadata).await.unwrap();
1727
1728        handle
1729            .process_signature(TaskSignature::new(
1730                task_index,
1731                task_response_digest,
1732                bls_sig_op_1.clone(),
1733                test_operator_1.operator_id,
1734            ))
1735            .await
1736            .unwrap();
1737
1738        let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1739
1740        let signers_apk_g2: BlsG2Point = test_operator_1.bls_keypair.public_key_g2();
1741
1742        let expected_agg_service_response = BlsAggregationServiceResponse {
1743            task_index,
1744            task_response_digest,
1745            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], //
1746            quorum_apks_g1: vec![quorum_apks_g1],
1747            signers_apk_g2,
1748            signers_agg_sig_g1: bls_sig_op_1,
1749            non_signer_quorum_bitmap_indices: vec![],
1750            quorum_apk_indices: vec![],
1751            total_stake_indices: vec![],
1752            non_signer_stake_indices: vec![],
1753        };
1754
1755        let response = aggregator_response.receive_aggregated_response().await;
1756
1757        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1758        assert_eq!(task_index, response.unwrap().task_index);
1759    }
1760
1761    #[tokio::test]
1762    async fn test_1_quorum_2_operator_1_signatures_60_threshold() {
1763        let test_operator_1 = TestOperator {
1764            operator_id: U256::from(1).into(),
1765            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1766            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1767        };
1768        let test_operator_2 = TestOperator {
1769            operator_id: U256::from(2).into(),
1770            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1771            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1772        };
1773        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1774        let block_number = 1;
1775        let task_index = 0;
1776        let quorum_numbers: Vec<QuorumNum> = vec![0];
1777        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8];
1778        let time_to_expiry = Duration::from_secs(1);
1779        let task_response = 123; // Initialize with appropriate data
1780        let task_response_digest = hash(task_response);
1781        let bls_sig_op_1 = test_operator_1
1782            .bls_keypair
1783            .sign_message(task_response_digest.as_ref());
1784
1785        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1786        let bls_agg_service =
1787            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1788
1789        let metadata = TaskMetadata::new(
1790            task_index,
1791            block_number,
1792            quorum_numbers,
1793            quorum_threshold_percentages,
1794            time_to_expiry,
1795        );
1796        let (handle, mut aggregator_response) = bls_agg_service.start();
1797        handle.initialize_task(metadata).await.unwrap();
1798
1799        handle
1800            .process_signature(TaskSignature::new(
1801                task_index,
1802                task_response_digest,
1803                bls_sig_op_1,
1804                test_operator_1.operator_id,
1805            ))
1806            .await
1807            .unwrap();
1808
1809        let response = aggregator_response.receive_aggregated_response().await;
1810
1811        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1812    }
1813
1814    #[tokio::test]
1815    async fn test_2_quorums_2_operators_which_just_take_1_quorum_2_correct_signatures() {
1816        let test_operator_1 = TestOperator {
1817            operator_id: U256::from(1).into(),
1818            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1819            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1820            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1821        };
1822        let test_operator_2 = TestOperator {
1823            operator_id: U256::from(2).into(),
1824            // Note the quorums is [0, 1], but operator id 2 just stake 1.
1825            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1826            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1827        };
1828
1829        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1830        let block_number = 1;
1831        let task_index = 0;
1832        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1833        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1834        let time_to_expiry = Duration::from_secs(1);
1835        let task_response = 123; // Initialize with appropriate data
1836        let task_response_digest = hash(task_response);
1837
1838        let fake_avs_registry_service =
1839            FakeAvsRegistryService::new(block_number, test_operators.clone());
1840        let bls_agg_service =
1841            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1842
1843        let metadata = TaskMetadata::new(
1844            task_index,
1845            block_number,
1846            quorum_numbers,
1847            quorum_threshold_percentages,
1848            time_to_expiry,
1849        );
1850        let (handle, mut aggregator_response) = bls_agg_service.start();
1851        handle.initialize_task(metadata).await.unwrap();
1852
1853        let bls_sig_op_1 = test_operator_1
1854            .bls_keypair
1855            .sign_message(task_response_digest.as_ref());
1856        handle
1857            .process_signature(TaskSignature::new(
1858                task_index,
1859                task_response_digest,
1860                bls_sig_op_1.clone(),
1861                test_operator_1.operator_id,
1862            ))
1863            .await
1864            .unwrap();
1865
1866        let bls_sig_op_2 = test_operator_2
1867            .bls_keypair
1868            .sign_message(task_response_digest.as_ref());
1869        handle
1870            .process_signature(TaskSignature::new(
1871                task_index,
1872                task_response_digest,
1873                bls_sig_op_2.clone(),
1874                test_operator_2.operator_id,
1875            ))
1876            .await
1877            .unwrap();
1878
1879        let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1880        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1881
1882        let expected_agg_service_response = BlsAggregationServiceResponse {
1883            task_index,
1884            task_response_digest,
1885            non_signers_pub_keys_g1: vec![],
1886            quorum_apks_g1: vec![
1887                test_operator_1.bls_keypair.public_key(),
1888                test_operator_2.bls_keypair.public_key(),
1889            ],
1890            signers_apk_g2,
1891            signers_agg_sig_g1,
1892            non_signer_quorum_bitmap_indices: vec![],
1893            quorum_apk_indices: vec![],
1894            total_stake_indices: vec![],
1895            non_signer_stake_indices: vec![],
1896        };
1897
1898        let response = aggregator_response.receive_aggregated_response().await;
1899
1900        assert_eq!(expected_agg_service_response, response.clone().unwrap());
1901        assert_eq!(task_index, response.unwrap().task_index);
1902    }
1903
1904    #[tokio::test]
1905    async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() {
1906        let test_operator_1 = TestOperator {
1907            operator_id: U256::from(1).into(),
1908            // Note the quorums is [0, 1], but operator id 1 just stake 0.
1909            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1910            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1911        };
1912        let test_operator_2 = TestOperator {
1913            operator_id: U256::from(2).into(),
1914            // Note the quorums is [0, 1], but operator id 2 just stake 1.
1915            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1916            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1917        };
1918
1919        let test_operator_3 = TestOperator {
1920            operator_id: U256::from(3).into(),
1921            // Note the quorums is [0, 1], but operator id 3 just stake 0.
1922            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1923            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1924        };
1925
1926        let test_operators = vec![
1927            test_operator_1.clone(),
1928            test_operator_2.clone(),
1929            test_operator_3.clone(),
1930        ];
1931        let block_number = 1;
1932        let task_index = 0;
1933        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1934        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8, 50u8];
1935        let time_to_expiry = Duration::from_secs(1);
1936        let task_response = 123; // Initialize with appropriate data
1937        let task_response_digest = hash(task_response);
1938
1939        let fake_avs_registry_service =
1940            FakeAvsRegistryService::new(block_number, test_operators.clone());
1941        let bls_agg_service =
1942            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1943
1944        let metadata = TaskMetadata::new(
1945            task_index,
1946            block_number,
1947            quorum_numbers,
1948            quorum_threshold_percentages,
1949            time_to_expiry,
1950        );
1951        let (handle, mut aggregator_response) = bls_agg_service.start();
1952        handle.initialize_task(metadata).await.unwrap();
1953
1954        let bls_sig_op_1 = test_operator_1
1955            .bls_keypair
1956            .sign_message(task_response_digest.as_ref());
1957        handle
1958            .process_signature(TaskSignature::new(
1959                task_index,
1960                task_response_digest,
1961                bls_sig_op_1.clone(),
1962                test_operator_1.operator_id,
1963            ))
1964            .await
1965            .unwrap();
1966
1967        let bls_sig_op_2 = test_operator_2
1968            .bls_keypair
1969            .sign_message(task_response_digest.as_ref());
1970        handle
1971            .process_signature(TaskSignature::new(
1972                task_index,
1973                task_response_digest,
1974                bls_sig_op_2.clone(),
1975                test_operator_2.operator_id,
1976            ))
1977            .await
1978            .unwrap();
1979        let signers_apk_g2 =
1980            aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
1981        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1982        let quorum_apks_g1 = vec![
1983            aggregate_g1_public_keys(&vec![test_operator_1, test_operator_3.clone()]),
1984            aggregate_g1_public_keys(&vec![test_operator_2, test_operator_3.clone()]),
1985        ];
1986
1987        let expected_agg_service_response = BlsAggregationServiceResponse {
1988            task_index,
1989            task_response_digest,
1990            non_signers_pub_keys_g1: vec![test_operator_3.bls_keypair.public_key()],
1991            quorum_apks_g1,
1992            signers_apk_g2,
1993            signers_agg_sig_g1,
1994            non_signer_quorum_bitmap_indices: vec![],
1995            quorum_apk_indices: vec![],
1996            total_stake_indices: vec![],
1997            non_signer_stake_indices: vec![],
1998        };
1999
2000        let response = aggregator_response.receive_aggregated_response().await;
2001
2002        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2003        assert_eq!(task_index, response.unwrap().task_index);
2004    }
2005
2006    #[tokio::test]
2007    async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() {
2008        // results in `task expired`
2009        let test_operator_1 = TestOperator {
2010            operator_id: U256::from(1).into(),
2011            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2012            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2013            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2014        };
2015        let test_operator_2 = TestOperator {
2016            operator_id: U256::from(2).into(),
2017            // Note the quorums is [0, 1], but operator id 2 just stake 1.
2018            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2019            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2020        };
2021
2022        let test_operator_3 = TestOperator {
2023            operator_id: U256::from(3).into(),
2024            // Note the quorums is [0, 1], but operator id 3 just stake 0.
2025            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2026            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2027        };
2028
2029        let test_operators = vec![
2030            test_operator_1.clone(),
2031            test_operator_2.clone(),
2032            test_operator_3.clone(),
2033        ];
2034        let block_number = 1;
2035        let task_index = 0;
2036        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2037        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8];
2038        let time_to_expiry = Duration::from_secs(1);
2039        let task_response = 123; // Initialize with appropriate data
2040        let task_response_digest = hash(task_response);
2041
2042        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2043        let bls_agg_service =
2044            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2045
2046        let metadata = TaskMetadata::new(
2047            task_index,
2048            block_number,
2049            quorum_numbers,
2050            quorum_threshold_percentages,
2051            time_to_expiry,
2052        );
2053        let (handle, mut aggregator_response) = bls_agg_service.start();
2054        handle.initialize_task(metadata).await.unwrap();
2055
2056        let bls_sig_op_1 = test_operator_1
2057            .bls_keypair
2058            .sign_message(task_response_digest.as_ref());
2059        handle
2060            .process_signature(TaskSignature::new(
2061                task_index,
2062                task_response_digest,
2063                bls_sig_op_1,
2064                test_operator_1.operator_id,
2065            ))
2066            .await
2067            .unwrap();
2068
2069        let bls_sig_op_2 = test_operator_2
2070            .bls_keypair
2071            .sign_message(task_response_digest.as_ref());
2072
2073        handle
2074            .process_signature(TaskSignature::new(
2075                task_index,
2076                task_response_digest,
2077                bls_sig_op_2,
2078                test_operator_2.operator_id,
2079            ))
2080            .await
2081            .unwrap();
2082
2083        let response = aggregator_response.receive_aggregated_response().await;
2084
2085        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2086    }
2087
2088    #[tokio::test]
2089    async fn test_2_quorums_1_operator_which_just_take_1_quorum_1_signature_task_expired() {
2090        let test_operator_1 = TestOperator {
2091            operator_id: U256::from(1).into(),
2092            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2093            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2094            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2095        };
2096
2097        let block_number = 1;
2098        let task_index = 0;
2099        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2100        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2101        let time_to_expiry = Duration::from_secs(1);
2102        let task_response = 123; // Initialize with appropriate data
2103        let task_response_digest = hash(task_response);
2104
2105        let fake_avs_registry_service =
2106            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2107        let bls_agg_service =
2108            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2109
2110        let metadata = TaskMetadata::new(
2111            task_index,
2112            block_number,
2113            quorum_numbers,
2114            quorum_threshold_percentages,
2115            time_to_expiry,
2116        );
2117        let (handle, mut aggregator_response) = bls_agg_service.start();
2118        handle.initialize_task(metadata).await.unwrap();
2119
2120        let bls_sig_op_1 = test_operator_1
2121            .bls_keypair
2122            .sign_message(task_response_digest.as_ref());
2123
2124        handle
2125            .process_signature(TaskSignature::new(
2126                task_index,
2127                task_response_digest,
2128                bls_sig_op_1,
2129                test_operator_1.operator_id,
2130            ))
2131            .await
2132            .unwrap();
2133
2134        let response = aggregator_response.receive_aggregated_response().await;
2135
2136        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2137    }
2138
2139    #[tokio::test]
2140    async fn test_2_quorums_2_operators_where_1_operator_just_take_1_quorum_1_signature_task_expired(
2141    ) {
2142        let test_operator_1 = TestOperator {
2143            operator_id: U256::from(1).into(),
2144            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2145            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2146            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2147        };
2148        let test_operator_2 = TestOperator {
2149            operator_id: U256::from(2).into(),
2150            // Note the quorums is [0, 1], but operator id 2 just stake 1.
2151            stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2152            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2153        };
2154
2155        let block_number = 1;
2156        let task_index = 0;
2157        let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2158        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2159        let time_to_expiry = Duration::from_secs(1);
2160        let task_response = 123; // Initialize with appropriate data
2161        let task_response_digest = hash(task_response);
2162        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2163
2164        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2165        let bls_agg_service =
2166            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2167
2168        let metadata = TaskMetadata::new(
2169            task_index,
2170            block_number,
2171            quorum_numbers,
2172            quorum_threshold_percentages,
2173            time_to_expiry,
2174        );
2175        let (handle, mut aggregator_response) = bls_agg_service.start();
2176        handle.initialize_task(metadata).await.unwrap();
2177
2178        let bls_sig_op_1 = test_operator_1
2179            .bls_keypair
2180            .sign_message(task_response_digest.as_ref());
2181        handle
2182            .process_signature(TaskSignature::new(
2183                task_index,
2184                task_response_digest,
2185                bls_sig_op_1,
2186                test_operator_1.operator_id,
2187            ))
2188            .await
2189            .unwrap();
2190
2191        let response = aggregator_response.receive_aggregated_response().await;
2192
2193        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2194    }
2195
2196    #[tokio::test]
2197    async fn send_signature_of_task_not_initialized() {
2198        let test_operator_1 = TestOperator {
2199            operator_id: U256::from(1).into(),
2200            // Note the quorums is [0, 1], but operator id 1 just stake 0.
2201            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2202            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2203        };
2204
2205        let block_number = 1;
2206        let task_index = 0;
2207        let task_response = 123; // Initialize with appropriate data
2208        let task_response_digest = hash(task_response);
2209
2210        let fake_avs_registry_service =
2211            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2212        let bls_agg_service =
2213            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2214
2215        let bls_sig_op_1 = test_operator_1
2216            .bls_keypair
2217            .sign_message(task_response_digest.as_ref());
2218        let (handle, _) = bls_agg_service.start();
2219        let result = handle
2220            .process_signature(TaskSignature::new(
2221                task_index,
2222                task_response_digest,
2223                bls_sig_op_1,
2224                test_operator_1.operator_id,
2225            ))
2226            .await;
2227
2228        assert_eq!(Err(BlsAggregationServiceError::TaskNotFound), result);
2229    }
2230
2231    #[tokio::test]
2232    async fn test_1_quorum_2_operator_2_signatures_on_2_different_msgs() {
2233        let test_operator_1 = TestOperator {
2234            operator_id: U256::from(1).into(),
2235            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2236            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2237        };
2238        let test_operator_2 = TestOperator {
2239            operator_id: U256::from(2).into(),
2240            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2241            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2242        };
2243        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2244        let block_number = 1;
2245        let task_index = 0;
2246        let quorum_numbers: Vec<QuorumNum> = vec![0];
2247        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
2248        let time_to_expiry = Duration::from_secs(1);
2249        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2250        let bls_agg_service =
2251            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2252
2253        let metadata = TaskMetadata::new(
2254            task_index,
2255            block_number,
2256            quorum_numbers,
2257            quorum_threshold_percentages,
2258            time_to_expiry,
2259        );
2260        let (handle, mut aggregator_response) = bls_agg_service.start();
2261        handle.initialize_task(metadata).await.unwrap();
2262
2263        let task_response_1 = 123; // Initialize with appropriate data
2264        let task_response_1_digest = hash(task_response_1);
2265        let bls_sig_op_1 = test_operator_1
2266            .bls_keypair
2267            .sign_message(task_response_1_digest.as_ref());
2268        handle
2269            .process_signature(TaskSignature::new(
2270                task_index,
2271                task_response_1_digest,
2272                bls_sig_op_1,
2273                test_operator_1.operator_id,
2274            ))
2275            .await
2276            .unwrap();
2277
2278        let task_response_2 = 456; // Initialize with appropriate data
2279        let task_response_2_digest = hash(task_response_2);
2280        let bls_sig_op_2 = test_operator_1
2281            .bls_keypair
2282            .sign_message(task_response_2_digest.as_ref());
2283        handle
2284            .process_signature(TaskSignature::new(
2285                task_index,
2286                task_response_2_digest,
2287                bls_sig_op_2,
2288                test_operator_1.operator_id,
2289            ))
2290            .await
2291            .unwrap();
2292
2293        let response = aggregator_response.receive_aggregated_response().await;
2294
2295        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2296    }
2297
2298    #[tokio::test]
2299    async fn test_1_quorum_1_operator_1_invalid_signature() {
2300        let test_operator_1 = TestOperator {
2301            operator_id: U256::from(1).into(),
2302            stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2303            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2304        };
2305
2306        let block_number = 1;
2307        let task_index = 0;
2308        let quorum_numbers = vec![0];
2309        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
2310        let time_to_expiry = Duration::from_secs(1);
2311        let task_response = 123; // Initialize with appropriate data
2312
2313        let wrong_task_response_digest = hash(task_response + 1);
2314        let bls_signature = test_operator_1
2315            .bls_keypair
2316            .sign_message(hash(task_response).as_ref());
2317        let fake_avs_registry_service =
2318            FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2319        let bls_agg_service =
2320            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2321        let metadata = TaskMetadata::new(
2322            task_index,
2323            block_number,
2324            quorum_numbers,
2325            quorum_threshold_percentages,
2326            time_to_expiry,
2327        );
2328        let (handle, mut aggregator_response) = bls_agg_service.start();
2329        handle.initialize_task(metadata).await.unwrap();
2330
2331        let result = handle
2332            .process_signature(TaskSignature::new(
2333                task_index,
2334                wrong_task_response_digest,
2335                bls_signature.clone(),
2336                test_operator_1.operator_id,
2337            ))
2338            .await;
2339
2340        assert_eq!(
2341            Err(BlsAggregationServiceError::SignatureVerificationError(
2342                IncorrectSignature
2343            )),
2344            result
2345        );
2346
2347        // Also test that the aggregator service is not affected by the invalid signature, so the task should expire
2348        let response = aggregator_response.receive_aggregated_response().await;
2349
2350        assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2351    }
2352
2353    #[tokio::test]
2354    async fn test_signatures_are_processed_during_window_after_quorum() {
2355        let test_operator_1 = TestOperator {
2356            operator_id: U256::from(1).into(),
2357            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2358            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2359        };
2360        let test_operator_2 = TestOperator {
2361            operator_id: U256::from(2).into(),
2362            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2363            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2364        };
2365        let test_operator_3 = TestOperator {
2366            operator_id: U256::from(3).into(),
2367            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2368            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2369        };
2370        let test_operators = vec![
2371            test_operator_1.clone(),
2372            test_operator_2.clone(),
2373            test_operator_3.clone(),
2374        ];
2375        let block_number = 1;
2376        let task_index = 0;
2377        let task_response = 123;
2378        let quorum_numbers: Vec<QuorumNum> = vec![0];
2379        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50_u8];
2380        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2381        let bls_agg_service =
2382            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2383
2384        let time_to_expiry = Duration::from_secs(5);
2385        let window_duration = Duration::from_secs(1);
2386
2387        let start = Instant::now();
2388        let metadata = TaskMetadata::new(
2389            task_index,
2390            block_number,
2391            quorum_numbers,
2392            quorum_threshold_percentages,
2393            time_to_expiry,
2394        )
2395        .with_window_duration(window_duration);
2396
2397        let (handle, mut aggregator_response) = bls_agg_service.start();
2398        handle.initialize_task(metadata).await.unwrap();
2399
2400        let task_response_1_digest = hash(task_response);
2401        let bls_sig_op_1 = test_operator_1
2402            .bls_keypair
2403            .sign_message(task_response_1_digest.as_ref());
2404        handle
2405            .process_signature(TaskSignature::new(
2406                task_index,
2407                task_response_1_digest,
2408                bls_sig_op_1.clone(),
2409                test_operator_1.operator_id,
2410            ))
2411            .await
2412            .unwrap();
2413
2414        let task_response_2_digest = hash(task_response);
2415        let bls_sig_op_2 = test_operator_2
2416            .bls_keypair
2417            .sign_message(task_response_2_digest.as_ref());
2418        handle
2419            .process_signature(TaskSignature::new(
2420                task_index,
2421                task_response_2_digest,
2422                bls_sig_op_2.clone(),
2423                test_operator_2.operator_id,
2424            ))
2425            .await
2426            .unwrap();
2427
2428        // quorum reached here, window should be open receiving signatures for 1 second
2429        sleep(Duration::from_millis(500)).await;
2430        let task_response_3_digest = hash(task_response);
2431        let bls_sig_op_3 = test_operator_3
2432            .bls_keypair
2433            .sign_message(task_response_3_digest.as_ref());
2434        handle
2435            .process_signature(TaskSignature::new(
2436                task_index,
2437                task_response_3_digest,
2438                bls_sig_op_3.clone(),
2439                test_operator_3.operator_id,
2440            ))
2441            .await
2442            .unwrap();
2443
2444        let signers_apk_g2 = aggregate_g2_public_keys(&vec![
2445            test_operator_1.clone(),
2446            test_operator_2.clone(),
2447            test_operator_3.clone(),
2448        ]);
2449        let signers_agg_sig_g1 =
2450            aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
2451        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2452            test_operator_1,
2453            test_operator_2,
2454            test_operator_3,
2455        ])];
2456
2457        let expected_agg_service_response = BlsAggregationServiceResponse {
2458            task_index,
2459            task_response_digest: task_response_3_digest,
2460            non_signers_pub_keys_g1: vec![],
2461            quorum_apks_g1,
2462            signers_apk_g2,
2463            signers_agg_sig_g1,
2464            non_signer_quorum_bitmap_indices: vec![],
2465            quorum_apk_indices: vec![],
2466            total_stake_indices: vec![],
2467            non_signer_stake_indices: vec![],
2468        };
2469
2470        let response = aggregator_response.receive_aggregated_response().await;
2471
2472        let elapsed = start.elapsed();
2473        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2474        assert_eq!(task_index, response.unwrap().task_index);
2475        assert!(elapsed < time_to_expiry);
2476        assert!(elapsed >= window_duration);
2477    }
2478
2479    #[tokio::test]
2480    async fn test_if_quorum_has_been_reached_and_the_task_expires_during_window_the_response_is_sent(
2481    ) {
2482        let test_operator_1 = TestOperator {
2483            operator_id: U256::from(1).into(),
2484            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2485            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2486        };
2487        let test_operator_2 = TestOperator {
2488            operator_id: U256::from(2).into(),
2489            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2490            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2491        };
2492        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2493        let block_number = 1;
2494        let task_index = 0;
2495        let task_response = 123;
2496        let quorum_numbers: Vec<QuorumNum> = vec![0];
2497        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2498        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2499        let bls_agg_service =
2500            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2501
2502        let time_to_expiry = Duration::from_secs(2);
2503        let window_duration = Duration::from_secs(10);
2504
2505        let start = Instant::now();
2506        let metadata = TaskMetadata::new(
2507            task_index,
2508            block_number,
2509            quorum_numbers,
2510            quorum_threshold_percentages,
2511            time_to_expiry,
2512        )
2513        .with_window_duration(window_duration);
2514        let (handle, mut aggregator_response) = bls_agg_service.start();
2515        handle.initialize_task(metadata).await.unwrap();
2516
2517        let task_response_1_digest = hash(task_response);
2518        let bls_sig_op_1 = test_operator_1
2519            .bls_keypair
2520            .sign_message(task_response_1_digest.as_ref());
2521        handle
2522            .process_signature(TaskSignature::new(
2523                task_index,
2524                task_response_1_digest,
2525                bls_sig_op_1.clone(),
2526                test_operator_1.operator_id,
2527            ))
2528            .await
2529            .unwrap();
2530
2531        // quorum reached here, window should be open receiving signatures
2532
2533        let task_response_2_digest = hash(task_response);
2534        let bls_sig_op_2 = test_operator_2
2535            .bls_keypair
2536            .sign_message(task_response_2_digest.as_ref());
2537        handle
2538            .process_signature(TaskSignature::new(
2539                task_index,
2540                task_response_2_digest,
2541                bls_sig_op_2.clone(),
2542                test_operator_2.operator_id,
2543            ))
2544            .await
2545            .unwrap();
2546
2547        let signers_apk_g2 =
2548            aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
2549        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
2550        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2551            test_operator_1,
2552            test_operator_2,
2553        ])];
2554
2555        let expected_agg_service_response = BlsAggregationServiceResponse {
2556            task_index,
2557            task_response_digest: task_response_2_digest,
2558            non_signers_pub_keys_g1: vec![],
2559            quorum_apks_g1,
2560            signers_apk_g2,
2561            signers_agg_sig_g1,
2562            non_signer_quorum_bitmap_indices: vec![],
2563            quorum_apk_indices: vec![],
2564            total_stake_indices: vec![],
2565            non_signer_stake_indices: vec![],
2566        };
2567
2568        let response = aggregator_response.receive_aggregated_response().await;
2569
2570        let elapsed = start.elapsed();
2571        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2572        assert_eq!(task_index, response.unwrap().task_index);
2573        assert!(elapsed >= time_to_expiry);
2574        assert!(elapsed < window_duration);
2575    }
2576
2577    #[tokio::test]
2578    async fn test_if_window_duration_is_zero_no_signatures_are_aggregated_after_reaching_quorum() {
2579        let test_operator_1 = TestOperator {
2580            operator_id: U256::from(1).into(),
2581            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2582            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2583        };
2584        let test_operator_2 = TestOperator {
2585            operator_id: U256::from(2).into(),
2586            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2587            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2588        };
2589        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2590        let block_number = 1;
2591        let task_index = 0;
2592        let task_response = 123;
2593        let quorum_numbers: Vec<QuorumNum> = vec![0];
2594        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2595        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2596        let bls_agg_service =
2597            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2598
2599        let time_to_expiry = Duration::from_secs(2);
2600        let window_duration = Duration::ZERO;
2601
2602        let start = Instant::now();
2603        let metadata = TaskMetadata::new(
2604            task_index,
2605            block_number,
2606            quorum_numbers,
2607            quorum_threshold_percentages,
2608            time_to_expiry,
2609        )
2610        .with_window_duration(window_duration);
2611        let (handle, mut aggregator_response) = bls_agg_service.start();
2612        handle.initialize_task(metadata).await.unwrap();
2613
2614        let task_response_1_digest = hash(task_response);
2615        let bls_sig_op_1 = test_operator_1
2616            .bls_keypair
2617            .sign_message(task_response_1_digest.as_ref());
2618        handle
2619            .process_signature(TaskSignature::new(
2620                task_index,
2621                task_response_1_digest,
2622                bls_sig_op_1.clone(),
2623                test_operator_1.operator_id,
2624            ))
2625            .await
2626            .unwrap();
2627
2628        // quorum reached here but window duration is zero, so no more signatures should be aggregated
2629        sleep(Duration::from_millis(1)).await;
2630
2631        let task_response_2_digest = hash(task_response);
2632        let bls_sig_op_2 = test_operator_2
2633            .bls_keypair
2634            .sign_message(task_response_2_digest.as_ref());
2635        let process_signature_result = handle
2636            .process_signature(TaskSignature::new(
2637                task_index,
2638                task_response_2_digest,
2639                bls_sig_op_2,
2640                test_operator_2.operator_id,
2641            ))
2642            .await;
2643        assert_eq!(
2644            Err(BlsAggregationServiceError::SenderError),
2645            process_signature_result
2646        );
2647
2648        let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2649        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2650        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2651            test_operator_1,
2652            test_operator_2.clone(),
2653        ])];
2654
2655        let expected_agg_service_response = BlsAggregationServiceResponse {
2656            task_index,
2657            task_response_digest: task_response_1_digest,
2658            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2659            quorum_apks_g1,
2660            signers_apk_g2,
2661            signers_agg_sig_g1,
2662            non_signer_quorum_bitmap_indices: vec![],
2663            quorum_apk_indices: vec![],
2664            total_stake_indices: vec![],
2665            non_signer_stake_indices: vec![],
2666        };
2667
2668        let response = aggregator_response.receive_aggregated_response().await;
2669
2670        let elapsed = start.elapsed();
2671        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2672        assert_eq!(task_index, response.unwrap().task_index);
2673        assert!(elapsed < time_to_expiry);
2674    }
2675
2676    #[tokio::test]
2677    async fn test_no_signatures_are_aggregated_after_window() {
2678        let test_operator_1 = TestOperator {
2679            operator_id: U256::from(1).into(),
2680            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2681            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2682        };
2683        let test_operator_2 = TestOperator {
2684            operator_id: U256::from(2).into(),
2685            stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2686            bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2687        };
2688        let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2689        let block_number = 1;
2690        let task_index = 0;
2691        let task_response = 123;
2692        let quorum_numbers: Vec<QuorumNum> = vec![0];
2693        let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2694        let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2695        let bls_agg_service =
2696            BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2697
2698        let time_to_expiry = Duration::from_secs(5);
2699        let window_duration = Duration::from_secs(1);
2700
2701        let start = Instant::now();
2702        let metadata = TaskMetadata::new(
2703            task_index,
2704            block_number,
2705            quorum_numbers,
2706            quorum_threshold_percentages,
2707            time_to_expiry,
2708        )
2709        .with_window_duration(window_duration);
2710        let (handle, mut aggregator_response) = bls_agg_service.start();
2711        handle.initialize_task(metadata).await.unwrap();
2712
2713        let task_response_1_digest = hash(task_response);
2714        let bls_sig_op_1 = test_operator_1
2715            .bls_keypair
2716            .sign_message(task_response_1_digest.as_ref());
2717        handle
2718            .process_signature(TaskSignature::new(
2719                task_index,
2720                task_response_1_digest,
2721                bls_sig_op_1.clone(),
2722                test_operator_1.operator_id,
2723            ))
2724            .await
2725            .unwrap();
2726
2727        // quorum reached here, window should be open for 1 second
2728        sleep(Duration::from_secs(2)).await;
2729
2730        let task_response_2_digest = hash(task_response);
2731        let bls_sig_op_2 = test_operator_2
2732            .bls_keypair
2733            .sign_message(task_response_2_digest.as_ref());
2734        let process_signature_result = handle
2735            .process_signature(TaskSignature::new(
2736                task_index,
2737                task_response_2_digest,
2738                bls_sig_op_2,
2739                test_operator_2.operator_id,
2740            ))
2741            .await;
2742        assert_eq!(
2743            Err(BlsAggregationServiceError::SenderError),
2744            process_signature_result
2745        );
2746
2747        let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2748        let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2749        let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2750            test_operator_1,
2751            test_operator_2.clone(),
2752        ])];
2753
2754        let expected_agg_service_response = BlsAggregationServiceResponse {
2755            task_index,
2756            task_response_digest: task_response_1_digest,
2757            non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2758            quorum_apks_g1,
2759            signers_apk_g2,
2760            signers_agg_sig_g1,
2761            non_signer_quorum_bitmap_indices: vec![],
2762            quorum_apk_indices: vec![],
2763            total_stake_indices: vec![],
2764            non_signer_stake_indices: vec![],
2765        };
2766
2767        let response = aggregator_response.receive_aggregated_response().await;
2768
2769        let elapsed = start.elapsed();
2770        assert_eq!(expected_agg_service_response, response.clone().unwrap());
2771        assert_eq!(task_index, response.unwrap().task_index);
2772        assert!(elapsed < time_to_expiry);
2773    }
2774}