1use super::bls_aggregation_service_error::BlsAggregationServiceError;
2use super::bls_aggregation_service_response::BlsAggregationServiceResponse;
3use alloy::primitives::{FixedBytes, Uint, U256};
4use ark_bn254::{G1Affine, G2Affine};
5use ark_ec::AffineRepr;
6use eigen_crypto_bls::{BlsG1Point, BlsG2Point, Signature};
7use eigen_crypto_bn254::utils::verify_message;
8
9use eigen_services_avsregistry::AvsRegistryService;
10use eigen_types::avs_state::OperatorAvsState;
11use eigen_types::{
12 avs::{SignatureVerificationError, TaskIndex, TaskResponseDigest},
13 operator::{QuorumThresholdPercentage, QuorumThresholdPercentages},
14};
15use std::collections::HashMap;
16use tokio::{
17 sync::{
18 mpsc::{self, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 time::Duration,
22};
23use tracing::{debug, error, instrument};
24
25#[derive(Debug, Clone)]
27pub struct AggregatedOperators {
28 signers_apk_g2: BlsG2Point,
29 signers_agg_sig_g1: Signature,
30 signers_total_stake_per_quorum: HashMap<u8, U256>,
31 pub signers_operator_ids_set: HashMap<FixedBytes<32>, bool>,
32}
33
34#[derive(Clone)]
36pub struct TaskMetadata {
37 task_index: TaskIndex,
39 task_created_block: u64,
41 quorum_numbers: Vec<u8>,
43 quorum_threshold_percentages: QuorumThresholdPercentages,
45 time_to_expiry: Duration,
47 window_duration: Duration,
49}
50
51impl TaskMetadata {
52 pub fn new(
69 task_index: TaskIndex,
70 task_created_block: u64,
71 quorum_numbers: Vec<u8>,
72 quorum_threshold_percentages: QuorumThresholdPercentages,
73 time_to_expiry: Duration,
74 ) -> Self {
75 Self {
76 task_index,
77 task_created_block,
78 quorum_numbers,
79 quorum_threshold_percentages,
80 time_to_expiry,
81 window_duration: Duration::ZERO,
82 }
83 }
84
85 pub fn with_window_duration(mut self, window_duration: Duration) -> Self {
94 self.window_duration = window_duration;
95 self
96 }
97}
98
99#[derive(Clone)]
101pub struct TaskSignature {
102 task_index: TaskIndex,
104 task_response_digest: TaskResponseDigest,
106 bls_signature: Signature,
108 operator_id: FixedBytes<32>,
110}
111
112impl TaskSignature {
113 pub fn new(
125 task_index: TaskIndex,
126 task_response_digest: TaskResponseDigest,
127 bls_signature: Signature,
128 operator_id: FixedBytes<32>,
129 ) -> Self {
130 Self {
131 task_index,
132 task_response_digest,
133 bls_signature,
134 operator_id,
135 }
136 }
137}
138
139pub enum AggregationMessage {
141 InitializeTask(
142 TaskMetadata,
143 oneshot::Sender<Result<(), BlsAggregationServiceError>>,
144 ),
145 ProcessSignature(
146 TaskSignature,
147 oneshot::Sender<Result<(), BlsAggregationServiceError>>,
148 ),
149}
150
151#[derive(Debug, Clone)]
153pub struct ServiceHandle {
154 msg_sender: UnboundedSender<AggregationMessage>,
156}
157
158impl ServiceHandle {
159 pub async fn initialize_task(
169 &self,
170 metadata: TaskMetadata,
171 ) -> Result<(), BlsAggregationServiceError> {
172 let (tx, rx) = oneshot::channel();
173 self.msg_sender
174 .send(AggregationMessage::InitializeTask(metadata, tx))
175 .map_err(|_| BlsAggregationServiceError::SenderError)?;
176
177 rx.await
178 .map_err(|_| BlsAggregationServiceError::ReceiverError)?
179 }
180
181 pub async fn process_signature(
193 &self,
194 task_signature: TaskSignature,
195 ) -> Result<(), BlsAggregationServiceError> {
196 let (tx, rx) = oneshot::channel();
197 self.msg_sender
198 .send(AggregationMessage::ProcessSignature(task_signature, tx))
199 .map_err(|_| BlsAggregationServiceError::SenderError)?;
200
201 rx.await
202 .map_err(|_| BlsAggregationServiceError::ReceiverError)?
203 }
204}
205
206#[derive(Debug)]
208pub struct AggregateReceiver {
209 aggregate_receiver:
211 UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
212}
213
214impl AggregateReceiver {
215 pub async fn receive_aggregated_response(
221 &mut self,
222 ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
223 self.aggregate_receiver
224 .recv()
225 .await
226 .ok_or(BlsAggregationServiceError::ReceiverError)?
227 }
228}
229
230#[derive(Debug)]
232pub struct BlsAggregatorService<A: AvsRegistryService>
233where
234 A: Clone,
235{
236 avs_registry_service: A,
237}
238
239#[derive(Debug)]
241struct SignedTaskResponseDigest {
242 task_response_digest: TaskResponseDigest,
243
244 bls_signature: Signature,
245
246 operator_id: FixedBytes<32>,
247
248 result_channel: oneshot::Sender<Result<(), BlsAggregationServiceError>>,
249}
250
251impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService<A> {
252 pub fn new(avs_registry_service: A) -> Self {
260 Self {
261 avs_registry_service,
262 }
263 }
264
265 pub fn start(self) -> (ServiceHandle, AggregateReceiver) {
271 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
272 let (agg_tx, agg_rx) = mpsc::unbounded_channel();
273
274 tokio::spawn(async move {
275 self.run(msg_rx, agg_tx).await;
276 });
277
278 let service_handler = ServiceHandle { msg_sender: msg_tx };
280 let aggregate_receiver = AggregateReceiver {
281 aggregate_receiver: agg_rx,
282 };
283
284 (service_handler, aggregate_receiver)
285 }
286
287 async fn run(
301 self,
302 mut msg_receiver: UnboundedReceiver<AggregationMessage>,
303 aggregate_sender: UnboundedSender<
304 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
305 >,
306 ) {
307 let mut task_channels: HashMap<TaskIndex, UnboundedSender<SignedTaskResponseDigest>> =
308 HashMap::new();
309
310 while let Some(message) = msg_receiver.recv().await {
311 match message {
312 AggregationMessage::InitializeTask(metadata, result_sender) => {
313 let task_index = metadata.task_index;
314 if task_channels.contains_key(&task_index) {
315 result_sender
317 .send(Err(BlsAggregationServiceError::DuplicateTaskIndex))
318 .ok();
319 continue;
320 }
321
322 let (signature_tx, signature_rx) =
324 mpsc::unbounded_channel::<SignedTaskResponseDigest>();
325 task_channels.insert(task_index, signature_tx);
326
327 let avs_registry_service = self.avs_registry_service.clone();
328 let aggregated_response_sender = aggregate_sender.clone();
329
330 tokio::spawn(async move {
331 let _ = Self::single_task_aggregator(
332 avs_registry_service,
333 metadata,
334 aggregated_response_sender,
335 signature_rx,
336 )
337 .await
338 .inspect_err(|err| {
339 println!("Error with single_task_aggregator: {:?}", err);
340 });
341 });
342
343 let _ = result_sender.send(Ok(()));
344 }
345 AggregationMessage::ProcessSignature(task_signature, result_sender) => {
346 if let Some(sig_sender) = task_channels.get_mut(&task_signature.task_index) {
347 let signed_digest = SignedTaskResponseDigest {
349 task_response_digest: task_signature.task_response_digest,
350 bls_signature: task_signature.bls_signature,
351 operator_id: task_signature.operator_id,
352 result_channel: result_sender,
353 };
354
355 if let Err(send_error) = sig_sender.send(signed_digest) {
356 let _ = send_error
357 .0
358 .result_channel
359 .send(Err(BlsAggregationServiceError::SenderError));
360 }
361 } else {
362 result_sender
363 .send(Err(BlsAggregationServiceError::TaskNotFound))
364 .ok();
365 }
366 }
367 }
368 }
369 }
370
371 async fn single_task_aggregator(
384 avs_registry_service: A,
385 metadata: TaskMetadata,
386 aggregated_response_sender: UnboundedSender<
387 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
388 >,
389 signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
390 ) -> Result<(), BlsAggregationServiceError> {
391 let quorum_threshold_percentage_map: HashMap<u8, u8> = metadata
392 .quorum_numbers
393 .iter()
394 .enumerate()
395 .map(|(i, quorum_number)| (*quorum_number, metadata.quorum_threshold_percentages[i]))
396 .collect();
397 let operator_state_avs = avs_registry_service
398 .get_operators_avs_state_at_block(metadata.task_created_block, &metadata.quorum_numbers)
399 .await
400 .map_err(|_| BlsAggregationServiceError::RegistryError)?;
401 let quorums_avs_state = avs_registry_service
402 .get_quorums_avs_state_at_block(&metadata.quorum_numbers, metadata.task_created_block)
403 .await
404 .map_err(|_| BlsAggregationServiceError::RegistryError)?;
405 let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state
406 .iter()
407 .map(|(k, v)| (*k, v.total_stake))
408 .collect();
409
410 let quorum_apks_g1: Vec<BlsG1Point> = metadata
411 .quorum_numbers
412 .iter()
413 .filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
414 .map(|avs_state| avs_state.agg_pub_key_g1.clone())
415 .collect();
416
417 Self::loop_task_aggregator(
418 avs_registry_service,
419 metadata.task_index,
420 metadata.task_created_block,
421 metadata.time_to_expiry,
422 aggregated_response_sender,
423 signatures_rx,
424 operator_state_avs,
425 total_stake_per_quorum,
426 quorum_threshold_percentage_map,
427 quorum_apks_g1,
428 metadata.quorum_numbers,
429 metadata.window_duration,
430 )
431 .await
432 }
433
434 #[allow(clippy::too_many_arguments)]
435 async fn loop_task_aggregator(
436 avs_registry_service: A,
437 task_index: TaskIndex,
438 task_created_block: u64,
439 time_to_expiry: Duration,
440 aggregated_response_sender: UnboundedSender<
441 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
442 >,
443 mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
444 operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
445 total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
446 quorum_threshold_percentage_map: HashMap<u8, u8>,
447 quorum_apks_g1: Vec<BlsG1Point>,
448 quorum_nums: Vec<u8>,
449 window_duration: Duration,
450 ) -> Result<(), BlsAggregationServiceError> {
451 let mut aggregated_operators: HashMap<FixedBytes<32>, AggregatedOperators> = HashMap::new();
452 let mut open_window = false;
453 let mut current_aggregated_response: Option<BlsAggregationServiceResponse> = None;
454 let (window_tx, mut window_rx) = tokio::sync::mpsc::unbounded_channel::<bool>();
455 let task_expired_timer = tokio::time::sleep(time_to_expiry);
456 tokio::pin!(task_expired_timer);
457
458 loop {
459 tokio::select! {
460 _ = &mut task_expired_timer => {
461 Self::handle_task_expired(
463 &aggregated_response_sender,
464 task_index,
465 open_window,
466 ¤t_aggregated_response,
467 )?;
468 return Ok(());
469 },
470 _ = window_rx.recv() => {
471 Self::handle_window_finished(
473 &aggregated_response_sender,
474 task_index,
475 ¤t_aggregated_response,
476 )?;
477 return Ok(());
478 },
479 signed_task_digest = signatures_rx.recv() => {
480 Self::handle_new_signature(
482 &avs_registry_service,
483 &mut aggregated_operators,
484 &mut open_window,
485 &mut current_aggregated_response,
486 &window_tx,
487 task_index,
488 task_created_block,
489 &operator_state_avs,
490 &total_stake_per_quorum,
491 &quorum_threshold_percentage_map,
492 &quorum_apks_g1,
493 &quorum_nums,
494 window_duration,
495 signed_task_digest,
496 ).await?;
497 }
498 }
499 }
500 }
501
502 #[allow(clippy::too_many_arguments)]
521 #[instrument(skip_all)]
522 async fn handle_new_signature(
523 avs_registry_service: &A,
524 aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
525 open_window: &mut bool,
526 current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
527 window_tx: &UnboundedSender<bool>,
528 task_index: TaskIndex,
529 task_created_block: u64,
530 operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
531 total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
532 quorum_threshold_percentage_map: &HashMap<u8, u8>,
533 quorum_apks_g1: &[BlsG1Point],
534 quorum_nums: &[u8],
535 window_duration: Duration,
536 signed_task_digest: Option<SignedTaskResponseDigest>,
537 ) -> Result<(), BlsAggregationServiceError> {
538 debug!("New signature received for task index: {task_index}");
539
540 let signed_digest =
541 signed_task_digest.ok_or(BlsAggregationServiceError::SignaturesChannelClosed)?;
542
543 if Self::is_duplicate_signature(aggregated_operators, &signed_digest) {
545 signed_digest
546 .result_channel
547 .send(Err(BlsAggregationServiceError::SignatureVerificationError(
548 SignatureVerificationError::DuplicateSignature,
549 )))
550 .map_err(|_| BlsAggregationServiceError::SenderError)?;
551 return Ok(());
552 }
553
554 let verification_result = verify_signature(task_index, &signed_digest, operator_state_avs)
556 .await
557 .map_err(BlsAggregationServiceError::SignatureVerificationError);
558
559 let verification_has_error = verification_result.is_err();
560
561 signed_digest
563 .result_channel
564 .send(verification_result)
565 .map_err(|_| BlsAggregationServiceError::SenderError)?;
566
567 if verification_has_error {
569 return Ok(());
570 }
571
572 let operator_state = operator_state_avs.get(&signed_digest.operator_id).unwrap();
573
574 let updated_aggregated = update_aggregated_operators(
576 aggregated_operators,
577 operator_state,
578 signed_digest.task_response_digest,
579 signed_digest.bls_signature,
580 signed_digest.operator_id,
581 );
582 aggregated_operators.insert(
583 signed_digest.task_response_digest,
584 updated_aggregated.clone(),
585 );
586
587 if !Self::check_if_stake_thresholds_met(
589 &updated_aggregated.signers_total_stake_per_quorum,
590 total_stake_per_quorum,
591 quorum_threshold_percentage_map,
592 ) {
593 return Ok(());
594 }
595
596 debug!("Signature threshold is met for task index: {task_index}");
597
598 if !*open_window {
600 *open_window = true;
601 Self::start_window(window_tx, window_duration, task_index);
602 }
603
604 *current_aggregated_response = Some(
605 Self::build_aggregated_response(
606 task_index,
607 task_created_block,
608 signed_digest.task_response_digest,
609 operator_state_avs,
610 updated_aggregated,
611 avs_registry_service,
612 quorum_apks_g1,
613 quorum_nums,
614 )
615 .await?,
616 );
617
618 Ok(())
619 }
620
621 #[instrument(skip_all)]
631 fn handle_task_expired(
632 aggregated_response_sender: &UnboundedSender<
633 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
634 >,
635 task_index: TaskIndex,
636 open_window: bool,
637 current_aggregated_response: &Option<BlsAggregationServiceResponse>,
638 ) -> Result<(), BlsAggregationServiceError> {
639 if open_window {
640 debug!("task_expired_timer while in the waiting window for task index: {task_index}",);
641 aggregated_response_sender
642 .send(Ok(current_aggregated_response.clone().unwrap()))
643 .map_err(|_| BlsAggregationServiceError::SenderError)?;
644 } else {
645 debug!("task_expired_timer NOT in the waiting window for task index: {task_index}",);
646
647 let _ = aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired));
648 }
649 Ok(())
650 }
651
652 #[instrument(skip_all)]
661 fn handle_window_finished(
662 aggregated_response_sender: &UnboundedSender<
663 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
664 >,
665 task_index: TaskIndex,
666 current_aggregated_response: &Option<BlsAggregationServiceResponse>,
667 ) -> Result<(), BlsAggregationServiceError> {
668 debug!("Window finished. Send aggregated response for task index: {task_index}");
669
670 aggregated_response_sender
671 .send(Ok(current_aggregated_response.clone().unwrap()))
672 .map_err(|_| BlsAggregationServiceError::SenderError)?;
673 Ok(())
674 }
675
676 #[allow(clippy::too_many_arguments)]
693 #[instrument(skip_all)]
694 async fn build_aggregated_response(
695 task_index: TaskIndex,
696 task_created_block: u64,
697 task_response_digest: FixedBytes<32>,
698 operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
699 digest_aggregated_operators: AggregatedOperators,
700 avs_registry_service: &A,
701 quorum_apks_g1: &[BlsG1Point],
702 quorum_nums: &[u8],
703 ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
704 debug!("Build aggregated response for task index: {task_index}");
705
706 let mut non_signers_operators_ids: Vec<FixedBytes<32>> = operator_state_avs
707 .keys()
708 .filter(|operator_id| {
709 !digest_aggregated_operators
710 .signers_operator_ids_set
711 .contains_key(*operator_id)
712 })
713 .cloned()
714 .collect();
715
716 non_signers_operators_ids.sort();
717
718 let non_signers_pub_keys_g1: Vec<BlsG1Point> = non_signers_operators_ids
719 .iter()
720 .filter_map(|operator_id| operator_state_avs.get(operator_id))
721 .filter_map(|operator_avs_state| operator_avs_state.operator_info.pub_keys.clone())
722 .map(|pub_keys| pub_keys.g1_pub_key)
723 .collect();
724
725 let indices = avs_registry_service
726 .get_check_signatures_indices(
727 task_created_block,
728 quorum_nums.into(),
729 non_signers_operators_ids,
730 )
731 .await
732 .map_err(|_err| BlsAggregationServiceError::RegistryError)?;
733
734 Ok(BlsAggregationServiceResponse {
735 task_index,
736 task_response_digest,
737 non_signers_pub_keys_g1,
738 quorum_apks_g1: quorum_apks_g1.into(),
739 signers_apk_g2: digest_aggregated_operators.signers_apk_g2,
740 signers_agg_sig_g1: digest_aggregated_operators.signers_agg_sig_g1,
741 non_signer_quorum_bitmap_indices: indices.clone().nonSignerQuorumBitmapIndices,
742 quorum_apk_indices: indices.quorumApkIndices,
743 total_stake_indices: indices.totalStakeIndices,
744 non_signer_stake_indices: indices.nonSignerStakeIndices,
745 })
746 }
747
748 fn check_if_stake_thresholds_met(
761 signed_stake_per_quorum: &HashMap<u8, U256>,
762 total_stake_per_quorum: &HashMap<u8, U256>,
763 quorum_threshold_percentages_map: &HashMap<u8, QuorumThresholdPercentage>,
764 ) -> bool {
765 for (quorum_num, quorum_threshold_percentage) in quorum_threshold_percentages_map {
766 let (Some(signed_stake_by_quorum), Some(total_stake_by_quorum)) = (
767 signed_stake_per_quorum.get(quorum_num),
768 total_stake_per_quorum.get(quorum_num),
769 ) else {
770 return false;
771 };
772
773 let signed_stake = signed_stake_by_quorum * U256::from(100);
774 let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage);
775
776 if signed_stake < threshold_stake {
777 return false;
778 }
779 }
780 true
781 }
782
783 fn is_duplicate_signature(
794 aggregated_operators: &HashMap<FixedBytes<32>, AggregatedOperators>,
795 signed_digest: &SignedTaskResponseDigest,
796 ) -> bool {
797 aggregated_operators
798 .get(&signed_digest.task_response_digest)
799 .map(|ops| {
800 ops.signers_operator_ids_set
801 .contains_key(&signed_digest.operator_id)
802 })
803 .unwrap_or(false)
804 }
805
806 #[instrument(skip_all)]
814 fn start_window(
815 window_tx: &UnboundedSender<bool>,
816 window_duration: Duration,
817 task_index: TaskIndex,
818 ) {
819 let sender = window_tx.clone();
820 debug!("Create window to wait for new signatures for task index: {task_index}");
821 tokio::spawn(async move {
822 tokio::time::sleep(window_duration).await;
823 let _ = sender.send(true);
824 });
825 }
826}
827
828#[instrument(skip_all)]
845async fn verify_signature(
846 task_index: TaskIndex,
847 signed_task_response_digest: &SignedTaskResponseDigest,
848 operator_avs_state: &HashMap<FixedBytes<32>, OperatorAvsState>,
849) -> Result<(), SignatureVerificationError> {
850 let Some(operator_state) = operator_avs_state.get(&signed_task_response_digest.operator_id)
851 else {
852 error!("Operator Not Found for task index: {task_index}");
853 return Err(SignatureVerificationError::OperatorNotFound);
854 };
855
856 let Some(pub_keys) = &operator_state.operator_info.pub_keys else {
857 error!("Operator Public Key Not Found for task index: {task_index}");
858 return Err(SignatureVerificationError::OperatorPublicKeyNotFound);
859 };
860
861 let message = signed_task_response_digest
862 .task_response_digest
863 .as_slice()
864 .try_into()
865 .map_err(|_| SignatureVerificationError::IncorrectSignature)?;
866
867 verify_message(
868 pub_keys.g2_pub_key.g2(),
869 message,
870 signed_task_response_digest.bls_signature.g1_point().g1(),
871 )
872 .then_some(())
873 .ok_or(SignatureVerificationError::IncorrectSignature)
874 .inspect(|_| {
875 debug!("Signature verification successful for task index: {task_index}");
876 })
877 .inspect_err(|_| {
878 error!("Signature verification failed for task index: {task_index}");
879 })
880}
881
882fn update_aggregated_operators(
896 aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
897 operator_state: &OperatorAvsState,
898 task_response_digest: FixedBytes<32>,
899 bls_signature: Signature,
900 operator_id: FixedBytes<32>,
901) -> AggregatedOperators {
902 debug!("Update aggregated operators");
903
904 let bls_signature_g1_point = bls_signature.g1_point().g1();
905
906 if let Some(existing) = aggregated_operators.get_mut(&task_response_digest) {
907 let updated = aggregate_new_operator(
909 existing,
910 operator_state.clone(),
911 operator_id,
912 bls_signature_g1_point,
913 );
914 updated.clone()
915 } else {
916 let operator_g2_pubkey = operator_state
918 .operator_info
919 .pub_keys
920 .clone()
921 .unwrap()
922 .g2_pub_key
923 .g2();
924 let mut signers_apk_g2 = BlsG2Point::new(G2Affine::zero());
925 let mut signers_agg_sig_g1 = Signature::new(G1Affine::zero());
926 for _ in 0..operator_state.stake_per_quorum.len() {
927 signers_apk_g2 = BlsG2Point::new((signers_apk_g2.g2() + operator_g2_pubkey).into());
928 signers_agg_sig_g1 = Signature::new(
929 (signers_agg_sig_g1.g1_point().g1() + bls_signature_g1_point).into(),
930 );
931 }
932 AggregatedOperators {
933 signers_apk_g2,
934 signers_agg_sig_g1,
935 signers_operator_ids_set: HashMap::from([(operator_state.operator_id, true)]),
936 signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(),
937 }
938 }
939}
940
941#[instrument(skip_all)]
953fn aggregate_new_operator(
954 aggregated_operators: &mut AggregatedOperators,
955 operator_state: OperatorAvsState,
956 operator_id: FixedBytes<32>,
957 signature_g1_point: G1Affine,
958) -> &mut AggregatedOperators {
959 let operator_g2_pubkey = operator_state
960 .operator_info
961 .pub_keys
962 .clone()
963 .unwrap()
964 .g2_pub_key
965 .g2();
966 aggregated_operators
967 .signers_operator_ids_set
968 .insert(operator_id, true);
969
970 debug!("operator {operator_id} inserted in signers_operator_ids_set");
971
972 for (quorum_num, stake) in operator_state.stake_per_quorum.iter() {
973 aggregated_operators.signers_agg_sig_g1 = Signature::new(
975 (aggregated_operators.signers_agg_sig_g1.g1_point().g1() + signature_g1_point).into(),
976 );
977 aggregated_operators.signers_apk_g2 =
978 BlsG2Point::new((aggregated_operators.signers_apk_g2.g2() + operator_g2_pubkey).into());
979 aggregated_operators
980 .signers_total_stake_per_quorum
981 .entry(*quorum_num)
982 .and_modify(|v| *v += stake)
983 .or_insert(*stake);
984 }
985 aggregated_operators
986}
987
988#[cfg(test)]
989mod tests {
990 use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService};
991 use crate::bls_agg::{TaskMetadata, TaskSignature};
992 use alloy::primitives::{B256, U256};
993 use eigen_crypto_bls::{BlsG1Point, BlsG2Point, BlsKeyPair, Signature};
994
995 use eigen_services_avsregistry::fake_avs_registry_service::FakeAvsRegistryService;
996 use eigen_types::avs::SignatureVerificationError::{DuplicateSignature, IncorrectSignature};
997 use eigen_types::operator::{QuorumNum, QuorumThresholdPercentages};
998 use eigen_types::{avs::TaskIndex, test::TestOperator};
999 use sha2::{Digest, Sha256};
1000 use std::collections::HashMap;
1001 use std::time::Duration;
1002 use std::vec;
1003 use tokio::time::{sleep, Instant};
1004
1005 const PRIVATE_KEY_1: &str =
1006 "13710126902690889134622698668747132666439281256983827313388062967626731803599";
1007 const PRIVATE_KEY_2: &str =
1008 "14610126902690889134622698668747132666439281256983827313388062967626731803500";
1009 const PRIVATE_KEY_3: &str =
1010 "15610126902690889134622698668747132666439281256983827313388062967626731803501";
1011
1012 fn hash(task_response: u64) -> B256 {
1013 let mut hasher = Sha256::new();
1014 hasher.update(task_response.to_be_bytes());
1015 B256::from_slice(hasher.finalize().as_ref())
1016 }
1017
1018 fn aggregate_g1_public_keys(operators: &[TestOperator]) -> BlsG1Point {
1019 operators
1020 .iter()
1021 .map(|op| op.bls_keypair.public_key().g1())
1022 .reduce(|a, b| (a + b).into())
1023 .map(BlsG1Point::new)
1024 .unwrap()
1025 }
1026
1027 fn aggregate_g2_public_keys(operators: &[TestOperator]) -> BlsG2Point {
1028 operators
1029 .iter()
1030 .map(|op| op.bls_keypair.public_key_g2().g2())
1031 .reduce(|a, b| (a + b).into())
1032 .map(BlsG2Point::new)
1033 .unwrap()
1034 }
1035
1036 fn aggregate_g1_signatures(signatures: &[Signature]) -> Signature {
1037 let agg = signatures
1038 .iter()
1039 .map(|s| s.g1_point().g1())
1040 .reduce(|a, b| (a + b).into())
1041 .unwrap();
1042 Signature::new(agg)
1043 }
1044
1045 #[tokio::test]
1046 async fn test_1_quorum_1_operator_1_correct_signature() {
1047 let test_operator_1 = TestOperator {
1048 operator_id: U256::from(1).into(),
1049 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1050 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1051 };
1052
1053 let block_number = 1;
1054 let task_index: TaskIndex = 0;
1055 let quorum_numbers = vec![0];
1056 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1057 let time_to_expiry = Duration::from_secs(1);
1058 let task_response = 123; let task_response_digest = hash(task_response);
1061 let bls_signature = test_operator_1
1062 .bls_keypair
1063 .sign_message(task_response_digest.as_ref());
1064 let fake_avs_registry_service =
1065 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1066 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1067 let metadata = TaskMetadata::new(
1068 task_index,
1069 block_number,
1070 quorum_numbers,
1071 quorum_threshold_percentages,
1072 time_to_expiry,
1073 );
1074 let (handle, mut aggregator_response) = bls_agg_service.start();
1075 handle.initialize_task(metadata).await.unwrap();
1076
1077 handle
1078 .process_signature(TaskSignature::new(
1079 task_index,
1080 task_response_digest,
1081 bls_signature,
1082 test_operator_1.operator_id,
1083 ))
1084 .await
1085 .unwrap();
1086
1087 let expected_agg_service_response = BlsAggregationServiceResponse {
1088 task_index,
1089 task_response_digest,
1090 non_signers_pub_keys_g1: vec![],
1091 quorum_apks_g1: vec![test_operator_1.bls_keypair.public_key()],
1092 signers_apk_g2: test_operator_1.bls_keypair.public_key_g2(),
1093 signers_agg_sig_g1: test_operator_1
1094 .bls_keypair
1095 .sign_message(task_response_digest.as_ref()),
1096 non_signer_quorum_bitmap_indices: vec![],
1097 quorum_apk_indices: vec![],
1098 total_stake_indices: vec![],
1099 non_signer_stake_indices: vec![],
1100 };
1101
1102 let response = aggregator_response.receive_aggregated_response().await;
1103
1104 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1105 assert_eq!(task_index, response.unwrap().task_index);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_1_quorum_2_operator_2_duplicated_signatures() {
1110 let test_operator_1 = TestOperator {
1111 operator_id: U256::from(1).into(),
1112 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1113 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1114 };
1115 let test_operator_2 = TestOperator {
1116 operator_id: U256::from(2).into(),
1117 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1118 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1119 };
1120 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1121 let block_number = 1;
1122 let task_index: TaskIndex = 0;
1123 let quorum_numbers = vec![0];
1124 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1125 let time_to_expiry = Duration::from_secs(1);
1126 let task_response = 123; let task_response_digest = hash(task_response);
1128
1129 let fake_avs_registry_service =
1130 FakeAvsRegistryService::new(block_number, test_operators.clone());
1131 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1132 let metadata = TaskMetadata::new(
1133 task_index,
1134 block_number,
1135 quorum_numbers,
1136 quorum_threshold_percentages,
1137 time_to_expiry,
1138 );
1139 let (handle, mut aggregator_response) = bls_agg_service.start();
1140 handle.initialize_task(metadata).await.unwrap();
1141 let bls_signature_1 = test_operator_1
1142 .bls_keypair
1143 .sign_message(task_response_digest.as_ref());
1144 handle
1145 .process_signature(TaskSignature::new(
1146 task_index,
1147 task_response_digest,
1148 bls_signature_1.clone(),
1149 test_operator_1.operator_id,
1150 ))
1151 .await
1152 .unwrap();
1153
1154 let second_signature_processing_result = handle
1155 .process_signature(TaskSignature::new(
1156 task_index,
1157 task_response_digest,
1158 bls_signature_1.clone(),
1159 test_operator_1.operator_id,
1160 ))
1161 .await;
1162
1163 assert_eq!(
1164 second_signature_processing_result,
1165 Err(BlsAggregationServiceError::SignatureVerificationError(
1166 DuplicateSignature
1167 ))
1168 );
1169
1170 let bls_signature_2 = test_operator_2
1171 .bls_keypair
1172 .sign_message(task_response_digest.as_ref());
1173
1174 handle
1175 .process_signature(TaskSignature::new(
1176 task_index,
1177 task_response_digest,
1178 bls_signature_2.clone(),
1179 test_operator_2.operator_id,
1180 ))
1181 .await
1182 .unwrap();
1183
1184 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1185 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1186 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_signature_1, bls_signature_2]);
1187 let expected_agg_service_response = BlsAggregationServiceResponse {
1188 task_index,
1189 task_response_digest,
1190 non_signers_pub_keys_g1: vec![],
1191 quorum_apks_g1: vec![quorum_apks_g1],
1192 signers_apk_g2,
1193 signers_agg_sig_g1,
1194 non_signer_quorum_bitmap_indices: vec![],
1195 quorum_apk_indices: vec![],
1196 total_stake_indices: vec![],
1197 non_signer_stake_indices: vec![],
1198 };
1199
1200 let response = aggregator_response.receive_aggregated_response().await;
1201
1202 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1203 assert_eq!(task_index, response.unwrap().task_index);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_1_quorum_3_operator_3_correct_signatures() {
1208 let test_operator_1 = TestOperator {
1209 operator_id: U256::from(1).into(),
1210 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1211 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1212 };
1213 let test_operator_2 = TestOperator {
1214 operator_id: U256::from(2).into(),
1215 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1216 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1217 };
1218 let test_operator_3 = TestOperator {
1219 operator_id: U256::from(3).into(),
1220 stake_per_quorum: HashMap::from([(0u8, U256::from(300)), (1u8, U256::from(100))]),
1221 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1222 };
1223 let test_operators = vec![
1224 test_operator_1.clone(),
1225 test_operator_2.clone(),
1226 test_operator_3.clone(),
1227 ];
1228
1229 let block_number = 1;
1230 let task_index = 0;
1231 let quorum_numbers: Vec<QuorumNum> = vec![0];
1232 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
1233 let time_to_expiry = Duration::from_secs(1);
1234 let task_response = 123; let task_response_digest = hash(task_response);
1236
1237 let fake_avs_registry_service =
1238 FakeAvsRegistryService::new(block_number, test_operators.clone());
1239 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1240
1241 let metadata = TaskMetadata::new(
1242 task_index,
1243 block_number,
1244 quorum_numbers,
1245 quorum_threshold_percentages,
1246 time_to_expiry,
1247 );
1248 let (handle, mut aggregator_response) = bls_agg_service.start();
1249 handle.initialize_task(metadata).await.unwrap();
1250
1251 let bls_sig_op_1 = test_operator_1
1252 .bls_keypair
1253 .sign_message(task_response_digest.as_ref());
1254 handle
1255 .process_signature(TaskSignature::new(
1256 task_index,
1257 task_response_digest,
1258 bls_sig_op_1.clone(),
1259 test_operator_1.operator_id,
1260 ))
1261 .await
1262 .unwrap();
1263
1264 let bls_sig_op_2 = test_operator_2
1265 .bls_keypair
1266 .sign_message(task_response_digest.as_ref());
1267 handle
1268 .process_signature(TaskSignature::new(
1269 task_index,
1270 task_response_digest,
1271 bls_sig_op_2.clone(),
1272 test_operator_2.operator_id,
1273 ))
1274 .await
1275 .unwrap();
1276
1277 let bls_sig_op_3 = test_operator_3
1278 .bls_keypair
1279 .sign_message(task_response_digest.as_ref());
1280 handle
1281 .process_signature(TaskSignature::new(
1282 task_index,
1283 task_response_digest,
1284 bls_sig_op_3.clone(),
1285 test_operator_3.operator_id,
1286 ))
1287 .await
1288 .unwrap();
1289
1290 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1291 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1292 let signers_agg_sig_g1 =
1293 aggregate_g1_signatures(&vec![bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
1294
1295 let expected_agg_service_response = BlsAggregationServiceResponse {
1296 task_index,
1297 task_response_digest,
1298 non_signers_pub_keys_g1: vec![],
1299 quorum_apks_g1: vec![quorum_apks_g1],
1300 signers_apk_g2,
1301 signers_agg_sig_g1,
1302 non_signer_quorum_bitmap_indices: vec![],
1303 quorum_apk_indices: vec![],
1304 total_stake_indices: vec![],
1305 non_signer_stake_indices: vec![],
1306 };
1307
1308 let response = aggregator_response.receive_aggregated_response().await;
1309 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1310 assert_eq!(task_index, response.unwrap().task_index);
1311 }
1312
1313 #[tokio::test]
1314 async fn test_2_quorum_2_operator_2_correct_signatures() {
1315 let test_operator_1 = TestOperator {
1316 operator_id: U256::from(1).into(),
1317 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1318 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1319 };
1320 let test_operator_2 = TestOperator {
1321 operator_id: U256::from(2).into(),
1322 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1323 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1324 };
1325 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1326 let block_number = 1;
1327 let task_index = 0;
1328 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1329 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1330 let time_to_expiry = Duration::from_secs(1);
1331 let task_response = 123; let task_response_digest = hash(task_response);
1333
1334 let fake_avs_registry_service =
1335 FakeAvsRegistryService::new(block_number, test_operators.clone());
1336 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1337
1338 let metadata = TaskMetadata::new(
1339 task_index,
1340 block_number,
1341 quorum_numbers,
1342 quorum_threshold_percentages,
1343 time_to_expiry,
1344 );
1345 let (handle, mut aggregator_response) = bls_agg_service.start();
1346 handle.initialize_task(metadata).await.unwrap();
1347
1348 let bls_sig_op_1 = test_operator_1
1349 .bls_keypair
1350 .sign_message(task_response_digest.as_ref());
1351 handle
1352 .process_signature(TaskSignature::new(
1353 task_index,
1354 task_response_digest,
1355 bls_sig_op_1.clone(),
1356 test_operator_1.operator_id,
1357 ))
1358 .await
1359 .unwrap();
1360
1361 let bls_sig_op_2 = test_operator_2
1362 .bls_keypair
1363 .sign_message(task_response_digest.as_ref());
1364 handle
1365 .process_signature(TaskSignature::new(
1366 task_index,
1367 task_response_digest,
1368 bls_sig_op_2.clone(),
1369 test_operator_2.operator_id,
1370 ))
1371 .await
1372 .unwrap();
1373
1374 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1375 let signers_apk_g2 =
1376 aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1377 let signers_agg_sig_g1 = aggregate_g1_signatures(&[
1378 bls_sig_op_1.clone(),
1379 bls_sig_op_1,
1380 bls_sig_op_2.clone(),
1381 bls_sig_op_2,
1382 ]);
1383
1384 let expected_agg_service_response = BlsAggregationServiceResponse {
1385 task_index,
1386 task_response_digest,
1387 non_signers_pub_keys_g1: vec![],
1388 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1],
1389 signers_apk_g2,
1390 signers_agg_sig_g1,
1391 non_signer_quorum_bitmap_indices: vec![],
1392 quorum_apk_indices: vec![],
1393 total_stake_indices: vec![],
1394 non_signer_stake_indices: vec![],
1395 };
1396
1397 let response = aggregator_response.receive_aggregated_response().await;
1398
1399 assert_eq!(expected_agg_service_response, response.unwrap());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_2_concurrent_tasks_2_quorum_2_operator_2_correct_signatures() {
1404 let test_operator_1 = TestOperator {
1405 operator_id: U256::from(1).into(),
1406 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1407 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1408 };
1409 let test_operator_2 = TestOperator {
1410 operator_id: U256::from(2).into(),
1411 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1412 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1413 };
1414 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1415 let block_number = 1;
1416 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1417 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1418 let time_to_expiry = Duration::from_secs(1);
1419
1420 let fake_avs_registry_service =
1421 FakeAvsRegistryService::new(block_number, test_operators.clone());
1422 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1423
1424 let task_1_index = 1;
1426 let task_1_response = 123; let task_1_response_digest = hash(task_1_response);
1428 let metadata1 = TaskMetadata::new(
1429 task_1_index,
1430 block_number,
1431 quorum_numbers.clone(),
1432 quorum_threshold_percentages.clone(),
1433 time_to_expiry,
1434 );
1435 let (handle, mut aggregator_response) = bls_agg_service.start();
1436 handle.initialize_task(metadata1).await.unwrap();
1437
1438 let task_2_index = 2;
1439 let task_2_response = 234; let task_2_response_digest = hash(task_2_response);
1441 let metadata2 = TaskMetadata::new(
1442 task_2_index,
1443 block_number,
1444 quorum_numbers,
1445 quorum_threshold_percentages,
1446 time_to_expiry,
1447 );
1448 handle.initialize_task(metadata2).await.unwrap();
1449
1450 let bls_sig_task_1_op_1 = test_operator_1
1451 .bls_keypair
1452 .sign_message(task_1_response_digest.as_ref());
1453 handle
1454 .process_signature(TaskSignature::new(
1455 task_1_index,
1456 task_1_response_digest,
1457 bls_sig_task_1_op_1.clone(),
1458 test_operator_1.operator_id,
1459 ))
1460 .await
1461 .unwrap();
1462
1463 let bls_sig_task_1_op_2 = test_operator_2
1464 .bls_keypair
1465 .sign_message(task_1_response_digest.as_ref());
1466 handle
1467 .process_signature(TaskSignature::new(
1468 task_1_index,
1469 task_1_response_digest,
1470 bls_sig_task_1_op_2.clone(),
1471 test_operator_2.operator_id,
1472 ))
1473 .await
1474 .unwrap();
1475
1476 let bls_sig_task_2_op_1 = test_operator_1
1477 .bls_keypair
1478 .sign_message(task_2_response_digest.as_ref());
1479 handle
1480 .process_signature(TaskSignature::new(
1481 task_2_index,
1482 task_2_response_digest,
1483 bls_sig_task_2_op_1.clone(),
1484 test_operator_1.operator_id,
1485 ))
1486 .await
1487 .unwrap();
1488
1489 let bls_sig_task_2_op_2 = test_operator_2
1490 .bls_keypair
1491 .sign_message(task_2_response_digest.as_ref());
1492 handle
1493 .process_signature(TaskSignature::new(
1494 task_2_index,
1495 task_2_response_digest,
1496 bls_sig_task_2_op_2.clone(),
1497 test_operator_2.operator_id,
1498 ))
1499 .await
1500 .unwrap();
1501
1502 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1503 let signers_apk_g2 =
1504 aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1505 let signers_agg_sig_g1_task_1 = aggregate_g1_signatures(&[
1506 bls_sig_task_1_op_1.clone(),
1507 bls_sig_task_1_op_1,
1508 bls_sig_task_1_op_2.clone(),
1509 bls_sig_task_1_op_2,
1510 ]);
1511
1512 let expected_response_task_1 = BlsAggregationServiceResponse {
1513 task_index: task_1_index,
1514 task_response_digest: task_1_response_digest,
1515 non_signers_pub_keys_g1: vec![],
1516 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1517 signers_apk_g2: signers_apk_g2.clone(),
1518 signers_agg_sig_g1: signers_agg_sig_g1_task_1,
1519 non_signer_quorum_bitmap_indices: vec![],
1520 quorum_apk_indices: vec![],
1521 total_stake_indices: vec![],
1522 non_signer_stake_indices: vec![],
1523 };
1524
1525 let signers_agg_sig_g1_task_2 = aggregate_g1_signatures(&[
1526 bls_sig_task_2_op_1.clone(),
1527 bls_sig_task_2_op_1,
1528 bls_sig_task_2_op_2.clone(),
1529 bls_sig_task_2_op_2,
1530 ]);
1531
1532 let expected_response_task_2 = BlsAggregationServiceResponse {
1533 task_index: task_2_index,
1534 task_response_digest: task_2_response_digest,
1535 non_signers_pub_keys_g1: vec![],
1536 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1537 signers_apk_g2,
1538 signers_agg_sig_g1: signers_agg_sig_g1_task_2,
1539 non_signer_quorum_bitmap_indices: vec![],
1540 quorum_apk_indices: vec![],
1541 total_stake_indices: vec![],
1542 non_signer_stake_indices: vec![],
1543 };
1544
1545 let first_response = aggregator_response.receive_aggregated_response().await;
1546 let second_response = aggregator_response.receive_aggregated_response().await;
1547
1548 let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1
1549 {
1550 (first_response, second_response)
1551 } else {
1552 (second_response, first_response)
1553 };
1554
1555 assert_eq!(expected_response_task_1, task_1_response.unwrap());
1556 assert_eq!(expected_response_task_2, task_2_response.unwrap());
1557 }
1558
1559 #[tokio::test]
1560 async fn test_1_quorum_1_operator_0_signatures_task_expired() {
1561 let test_operator_1 = TestOperator {
1562 operator_id: U256::from(1).into(),
1563 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1564 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1565 };
1566
1567 let block_number = 1;
1568 let task_index: TaskIndex = 0;
1569 let quorum_numbers = vec![0];
1570 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1571 let time_to_expiry = Duration::from_secs(1);
1572 let _task_response = 123; let fake_avs_registry_service =
1575 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1576 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1577 let metadata = TaskMetadata::new(
1578 task_index,
1579 block_number,
1580 quorum_numbers,
1581 quorum_threshold_percentages,
1582 time_to_expiry,
1583 );
1584 let (handle, mut aggregator_response) = bls_agg_service.start();
1585 handle.initialize_task(metadata).await.unwrap();
1586
1587 let response = aggregator_response.receive_aggregated_response().await;
1588
1589 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1590 }
1591
1592 #[tokio::test]
1593 async fn test_1_quorum_2_operator_1_signatures_50_threshold() {
1594 let test_operator_1 = TestOperator {
1595 operator_id: U256::from(1).into(),
1596 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1597 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1598 };
1599 let test_operator_2 = TestOperator {
1600 operator_id: U256::from(2).into(),
1601 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1602 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1603 };
1604 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1605 let block_number = 1;
1606 let task_index = 0;
1607 let quorum_numbers: Vec<QuorumNum> = vec![0];
1608 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8];
1609 let time_to_expiry = Duration::from_secs(1);
1610 let task_response = 123; let task_response_digest = hash(task_response);
1612 let bls_sig_op_1 = test_operator_1
1613 .bls_keypair
1614 .sign_message(task_response_digest.as_ref());
1615
1616 let fake_avs_registry_service =
1617 FakeAvsRegistryService::new(block_number, test_operators.clone());
1618 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1619
1620 let metadata = TaskMetadata::new(
1621 task_index,
1622 block_number,
1623 quorum_numbers,
1624 quorum_threshold_percentages,
1625 time_to_expiry,
1626 );
1627 let (handle, mut aggregator_response) = bls_agg_service.start();
1628 handle.initialize_task(metadata).await.unwrap();
1629
1630 handle
1631 .process_signature(TaskSignature::new(
1632 task_index,
1633 task_response_digest,
1634 bls_sig_op_1.clone(),
1635 test_operator_1.operator_id,
1636 ))
1637 .await
1638 .unwrap();
1639
1640 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1641
1642 let signers_apk_g2: BlsG2Point = test_operator_1.bls_keypair.public_key_g2();
1643
1644 let expected_agg_service_response = BlsAggregationServiceResponse {
1645 task_index,
1646 task_response_digest,
1647 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], quorum_apks_g1: vec![quorum_apks_g1],
1649 signers_apk_g2,
1650 signers_agg_sig_g1: bls_sig_op_1,
1651 non_signer_quorum_bitmap_indices: vec![],
1652 quorum_apk_indices: vec![],
1653 total_stake_indices: vec![],
1654 non_signer_stake_indices: vec![],
1655 };
1656
1657 let response = aggregator_response.receive_aggregated_response().await;
1658
1659 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1660 assert_eq!(task_index, response.unwrap().task_index);
1661 }
1662
1663 #[tokio::test]
1664 async fn test_1_quorum_2_operator_1_signatures_60_threshold() {
1665 let test_operator_1 = TestOperator {
1666 operator_id: U256::from(1).into(),
1667 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1668 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1669 };
1670 let test_operator_2 = TestOperator {
1671 operator_id: U256::from(2).into(),
1672 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1673 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1674 };
1675 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1676 let block_number = 1;
1677 let task_index = 0;
1678 let quorum_numbers: Vec<QuorumNum> = vec![0];
1679 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8];
1680 let time_to_expiry = Duration::from_secs(1);
1681 let task_response = 123; let task_response_digest = hash(task_response);
1683 let bls_sig_op_1 = test_operator_1
1684 .bls_keypair
1685 .sign_message(task_response_digest.as_ref());
1686
1687 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1688 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1689
1690 let metadata = TaskMetadata::new(
1691 task_index,
1692 block_number,
1693 quorum_numbers,
1694 quorum_threshold_percentages,
1695 time_to_expiry,
1696 );
1697 let (handle, mut aggregator_response) = bls_agg_service.start();
1698 handle.initialize_task(metadata).await.unwrap();
1699
1700 handle
1701 .process_signature(TaskSignature::new(
1702 task_index,
1703 task_response_digest,
1704 bls_sig_op_1,
1705 test_operator_1.operator_id,
1706 ))
1707 .await
1708 .unwrap();
1709
1710 let response = aggregator_response.receive_aggregated_response().await;
1711
1712 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1713 }
1714
1715 #[tokio::test]
1716 async fn test_2_quorums_2_operators_which_just_take_1_quorum_2_correct_signatures() {
1717 let test_operator_1 = TestOperator {
1718 operator_id: U256::from(1).into(),
1719 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1721 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1722 };
1723 let test_operator_2 = TestOperator {
1724 operator_id: U256::from(2).into(),
1725 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1727 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1728 };
1729
1730 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1731 let block_number = 1;
1732 let task_index = 0;
1733 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1734 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1735 let time_to_expiry = Duration::from_secs(1);
1736 let task_response = 123; let task_response_digest = hash(task_response);
1738
1739 let fake_avs_registry_service =
1740 FakeAvsRegistryService::new(block_number, test_operators.clone());
1741 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1742
1743 let metadata = TaskMetadata::new(
1744 task_index,
1745 block_number,
1746 quorum_numbers,
1747 quorum_threshold_percentages,
1748 time_to_expiry,
1749 );
1750 let (handle, mut aggregator_response) = bls_agg_service.start();
1751 handle.initialize_task(metadata).await.unwrap();
1752
1753 let bls_sig_op_1 = test_operator_1
1754 .bls_keypair
1755 .sign_message(task_response_digest.as_ref());
1756 handle
1757 .process_signature(TaskSignature::new(
1758 task_index,
1759 task_response_digest,
1760 bls_sig_op_1.clone(),
1761 test_operator_1.operator_id,
1762 ))
1763 .await
1764 .unwrap();
1765
1766 let bls_sig_op_2 = test_operator_2
1767 .bls_keypair
1768 .sign_message(task_response_digest.as_ref());
1769 handle
1770 .process_signature(TaskSignature::new(
1771 task_index,
1772 task_response_digest,
1773 bls_sig_op_2.clone(),
1774 test_operator_2.operator_id,
1775 ))
1776 .await
1777 .unwrap();
1778
1779 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1780 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1781
1782 let expected_agg_service_response = BlsAggregationServiceResponse {
1783 task_index,
1784 task_response_digest,
1785 non_signers_pub_keys_g1: vec![],
1786 quorum_apks_g1: vec![
1787 test_operator_1.bls_keypair.public_key(),
1788 test_operator_2.bls_keypair.public_key(),
1789 ],
1790 signers_apk_g2,
1791 signers_agg_sig_g1,
1792 non_signer_quorum_bitmap_indices: vec![],
1793 quorum_apk_indices: vec![],
1794 total_stake_indices: vec![],
1795 non_signer_stake_indices: vec![],
1796 };
1797
1798 let response = aggregator_response.receive_aggregated_response().await;
1799
1800 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1801 assert_eq!(task_index, response.unwrap().task_index);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() {
1806 let test_operator_1 = TestOperator {
1807 operator_id: U256::from(1).into(),
1808 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1810 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1811 };
1812 let test_operator_2 = TestOperator {
1813 operator_id: U256::from(2).into(),
1814 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1816 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1817 };
1818
1819 let test_operator_3 = TestOperator {
1820 operator_id: U256::from(3).into(),
1821 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1823 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1824 };
1825
1826 let test_operators = vec![
1827 test_operator_1.clone(),
1828 test_operator_2.clone(),
1829 test_operator_3.clone(),
1830 ];
1831 let block_number = 1;
1832 let task_index = 0;
1833 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1834 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8, 50u8];
1835 let time_to_expiry = Duration::from_secs(1);
1836 let task_response = 123; let task_response_digest = hash(task_response);
1838
1839 let fake_avs_registry_service =
1840 FakeAvsRegistryService::new(block_number, test_operators.clone());
1841 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1842
1843 let metadata = TaskMetadata::new(
1844 task_index,
1845 block_number,
1846 quorum_numbers,
1847 quorum_threshold_percentages,
1848 time_to_expiry,
1849 );
1850 let (handle, mut aggregator_response) = bls_agg_service.start();
1851 handle.initialize_task(metadata).await.unwrap();
1852
1853 let bls_sig_op_1 = test_operator_1
1854 .bls_keypair
1855 .sign_message(task_response_digest.as_ref());
1856 handle
1857 .process_signature(TaskSignature::new(
1858 task_index,
1859 task_response_digest,
1860 bls_sig_op_1.clone(),
1861 test_operator_1.operator_id,
1862 ))
1863 .await
1864 .unwrap();
1865
1866 let bls_sig_op_2 = test_operator_2
1867 .bls_keypair
1868 .sign_message(task_response_digest.as_ref());
1869 handle
1870 .process_signature(TaskSignature::new(
1871 task_index,
1872 task_response_digest,
1873 bls_sig_op_2.clone(),
1874 test_operator_2.operator_id,
1875 ))
1876 .await
1877 .unwrap();
1878 let signers_apk_g2 =
1879 aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
1880 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1881 let quorum_apks_g1 = vec![
1882 aggregate_g1_public_keys(&vec![test_operator_1, test_operator_3.clone()]),
1883 aggregate_g1_public_keys(&vec![test_operator_2, test_operator_3.clone()]),
1884 ];
1885
1886 let expected_agg_service_response = BlsAggregationServiceResponse {
1887 task_index,
1888 task_response_digest,
1889 non_signers_pub_keys_g1: vec![test_operator_3.bls_keypair.public_key()],
1890 quorum_apks_g1,
1891 signers_apk_g2,
1892 signers_agg_sig_g1,
1893 non_signer_quorum_bitmap_indices: vec![],
1894 quorum_apk_indices: vec![],
1895 total_stake_indices: vec![],
1896 non_signer_stake_indices: vec![],
1897 };
1898
1899 let response = aggregator_response.receive_aggregated_response().await;
1900
1901 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1902 assert_eq!(task_index, response.unwrap().task_index);
1903 }
1904
1905 #[tokio::test]
1906 async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() {
1907 let test_operator_1 = TestOperator {
1909 operator_id: U256::from(1).into(),
1910 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1912 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1913 };
1914 let test_operator_2 = TestOperator {
1915 operator_id: U256::from(2).into(),
1916 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1918 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1919 };
1920
1921 let test_operator_3 = TestOperator {
1922 operator_id: U256::from(3).into(),
1923 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1925 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1926 };
1927
1928 let test_operators = vec![
1929 test_operator_1.clone(),
1930 test_operator_2.clone(),
1931 test_operator_3.clone(),
1932 ];
1933 let block_number = 1;
1934 let task_index = 0;
1935 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1936 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8];
1937 let time_to_expiry = Duration::from_secs(1);
1938 let task_response = 123; let task_response_digest = hash(task_response);
1940
1941 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1942 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
1943
1944 let metadata = TaskMetadata::new(
1945 task_index,
1946 block_number,
1947 quorum_numbers,
1948 quorum_threshold_percentages,
1949 time_to_expiry,
1950 );
1951 let (handle, mut aggregator_response) = bls_agg_service.start();
1952 handle.initialize_task(metadata).await.unwrap();
1953
1954 let bls_sig_op_1 = test_operator_1
1955 .bls_keypair
1956 .sign_message(task_response_digest.as_ref());
1957 handle
1958 .process_signature(TaskSignature::new(
1959 task_index,
1960 task_response_digest,
1961 bls_sig_op_1,
1962 test_operator_1.operator_id,
1963 ))
1964 .await
1965 .unwrap();
1966
1967 let bls_sig_op_2 = test_operator_2
1968 .bls_keypair
1969 .sign_message(task_response_digest.as_ref());
1970
1971 handle
1972 .process_signature(TaskSignature::new(
1973 task_index,
1974 task_response_digest,
1975 bls_sig_op_2,
1976 test_operator_2.operator_id,
1977 ))
1978 .await
1979 .unwrap();
1980
1981 let response = aggregator_response.receive_aggregated_response().await;
1982
1983 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1984 }
1985
1986 #[tokio::test]
1987 async fn test_2_quorums_1_operator_which_just_take_1_quorum_1_signature_task_expired() {
1988 let test_operator_1 = TestOperator {
1989 operator_id: U256::from(1).into(),
1990 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1992 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1993 };
1994
1995 let block_number = 1;
1996 let task_index = 0;
1997 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1998 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
1999 let time_to_expiry = Duration::from_secs(1);
2000 let task_response = 123; let task_response_digest = hash(task_response);
2002
2003 let fake_avs_registry_service =
2004 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2005 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2006
2007 let metadata = TaskMetadata::new(
2008 task_index,
2009 block_number,
2010 quorum_numbers,
2011 quorum_threshold_percentages,
2012 time_to_expiry,
2013 );
2014 let (handle, mut aggregator_response) = bls_agg_service.start();
2015 handle.initialize_task(metadata).await.unwrap();
2016
2017 let bls_sig_op_1 = test_operator_1
2018 .bls_keypair
2019 .sign_message(task_response_digest.as_ref());
2020
2021 handle
2022 .process_signature(TaskSignature::new(
2023 task_index,
2024 task_response_digest,
2025 bls_sig_op_1,
2026 test_operator_1.operator_id,
2027 ))
2028 .await
2029 .unwrap();
2030
2031 let response = aggregator_response.receive_aggregated_response().await;
2032
2033 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2034 }
2035
2036 #[tokio::test]
2037 async fn test_2_quorums_2_operators_where_1_operator_just_take_1_quorum_1_signature_task_expired(
2038 ) {
2039 let test_operator_1 = TestOperator {
2040 operator_id: U256::from(1).into(),
2041 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2043 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2044 };
2045 let test_operator_2 = TestOperator {
2046 operator_id: U256::from(2).into(),
2047 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2049 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2050 };
2051
2052 let block_number = 1;
2053 let task_index = 0;
2054 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2055 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2056 let time_to_expiry = Duration::from_secs(1);
2057 let task_response = 123; let task_response_digest = hash(task_response);
2059 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2060
2061 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2062 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2063
2064 let metadata = TaskMetadata::new(
2065 task_index,
2066 block_number,
2067 quorum_numbers,
2068 quorum_threshold_percentages,
2069 time_to_expiry,
2070 );
2071 let (handle, mut aggregator_response) = bls_agg_service.start();
2072 handle.initialize_task(metadata).await.unwrap();
2073
2074 let bls_sig_op_1 = test_operator_1
2075 .bls_keypair
2076 .sign_message(task_response_digest.as_ref());
2077 handle
2078 .process_signature(TaskSignature::new(
2079 task_index,
2080 task_response_digest,
2081 bls_sig_op_1,
2082 test_operator_1.operator_id,
2083 ))
2084 .await
2085 .unwrap();
2086
2087 let response = aggregator_response.receive_aggregated_response().await;
2088
2089 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2090 }
2091
2092 #[tokio::test]
2093 async fn send_signature_of_task_not_initialized() {
2094 let test_operator_1 = TestOperator {
2095 operator_id: U256::from(1).into(),
2096 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2098 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2099 };
2100
2101 let block_number = 1;
2102 let task_index = 0;
2103 let task_response = 123; let task_response_digest = hash(task_response);
2105
2106 let fake_avs_registry_service =
2107 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2108 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2109
2110 let bls_sig_op_1 = test_operator_1
2111 .bls_keypair
2112 .sign_message(task_response_digest.as_ref());
2113 let (handle, _) = bls_agg_service.start();
2114 let result = handle
2115 .process_signature(TaskSignature::new(
2116 task_index,
2117 task_response_digest,
2118 bls_sig_op_1,
2119 test_operator_1.operator_id,
2120 ))
2121 .await;
2122
2123 assert_eq!(Err(BlsAggregationServiceError::TaskNotFound), result);
2124 }
2125
2126 #[tokio::test]
2127 async fn test_1_quorum_2_operator_2_signatures_on_2_different_msgs() {
2128 let test_operator_1 = TestOperator {
2129 operator_id: U256::from(1).into(),
2130 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2131 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2132 };
2133 let test_operator_2 = TestOperator {
2134 operator_id: U256::from(2).into(),
2135 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2136 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2137 };
2138 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2139 let block_number = 1;
2140 let task_index = 0;
2141 let quorum_numbers: Vec<QuorumNum> = vec![0];
2142 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
2143 let time_to_expiry = Duration::from_secs(1);
2144 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2145 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2146
2147 let metadata = TaskMetadata::new(
2148 task_index,
2149 block_number,
2150 quorum_numbers,
2151 quorum_threshold_percentages,
2152 time_to_expiry,
2153 );
2154 let (handle, mut aggregator_response) = bls_agg_service.start();
2155 handle.initialize_task(metadata).await.unwrap();
2156
2157 let task_response_1 = 123; let task_response_1_digest = hash(task_response_1);
2159 let bls_sig_op_1 = test_operator_1
2160 .bls_keypair
2161 .sign_message(task_response_1_digest.as_ref());
2162 handle
2163 .process_signature(TaskSignature::new(
2164 task_index,
2165 task_response_1_digest,
2166 bls_sig_op_1,
2167 test_operator_1.operator_id,
2168 ))
2169 .await
2170 .unwrap();
2171
2172 let task_response_2 = 456; let task_response_2_digest = hash(task_response_2);
2174 let bls_sig_op_2 = test_operator_1
2175 .bls_keypair
2176 .sign_message(task_response_2_digest.as_ref());
2177 handle
2178 .process_signature(TaskSignature::new(
2179 task_index,
2180 task_response_2_digest,
2181 bls_sig_op_2,
2182 test_operator_1.operator_id,
2183 ))
2184 .await
2185 .unwrap();
2186
2187 let response = aggregator_response.receive_aggregated_response().await;
2188
2189 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2190 }
2191
2192 #[tokio::test]
2193 async fn test_1_quorum_1_operator_1_invalid_signature() {
2194 let test_operator_1 = TestOperator {
2195 operator_id: U256::from(1).into(),
2196 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2197 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2198 };
2199
2200 let block_number = 1;
2201 let task_index = 0;
2202 let quorum_numbers = vec![0];
2203 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
2204 let time_to_expiry = Duration::from_secs(1);
2205 let task_response = 123; let wrong_task_response_digest = hash(task_response + 1);
2208 let bls_signature = test_operator_1
2209 .bls_keypair
2210 .sign_message(hash(task_response).as_ref());
2211 let fake_avs_registry_service =
2212 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2213 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2214 let metadata = TaskMetadata::new(
2215 task_index,
2216 block_number,
2217 quorum_numbers,
2218 quorum_threshold_percentages,
2219 time_to_expiry,
2220 );
2221 let (handle, mut aggregator_response) = bls_agg_service.start();
2222 handle.initialize_task(metadata).await.unwrap();
2223
2224 let result = handle
2225 .process_signature(TaskSignature::new(
2226 task_index,
2227 wrong_task_response_digest,
2228 bls_signature.clone(),
2229 test_operator_1.operator_id,
2230 ))
2231 .await;
2232
2233 assert_eq!(
2234 Err(BlsAggregationServiceError::SignatureVerificationError(
2235 IncorrectSignature
2236 )),
2237 result
2238 );
2239
2240 let response = aggregator_response.receive_aggregated_response().await;
2242
2243 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2244 }
2245
2246 #[tokio::test]
2247 async fn test_signatures_are_processed_during_window_after_quorum() {
2248 let test_operator_1 = TestOperator {
2249 operator_id: U256::from(1).into(),
2250 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2251 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2252 };
2253 let test_operator_2 = TestOperator {
2254 operator_id: U256::from(2).into(),
2255 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2256 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2257 };
2258 let test_operator_3 = TestOperator {
2259 operator_id: U256::from(3).into(),
2260 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2261 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2262 };
2263 let test_operators = vec![
2264 test_operator_1.clone(),
2265 test_operator_2.clone(),
2266 test_operator_3.clone(),
2267 ];
2268 let block_number = 1;
2269 let task_index = 0;
2270 let task_response = 123;
2271 let quorum_numbers: Vec<QuorumNum> = vec![0];
2272 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50_u8];
2273 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2274 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2275
2276 let time_to_expiry = Duration::from_secs(5);
2277 let window_duration = Duration::from_secs(1);
2278
2279 let start = Instant::now();
2280 let metadata = TaskMetadata::new(
2281 task_index,
2282 block_number,
2283 quorum_numbers,
2284 quorum_threshold_percentages,
2285 time_to_expiry,
2286 )
2287 .with_window_duration(window_duration);
2288
2289 let (handle, mut aggregator_response) = bls_agg_service.start();
2290 handle.initialize_task(metadata).await.unwrap();
2291
2292 let task_response_1_digest = hash(task_response);
2293 let bls_sig_op_1 = test_operator_1
2294 .bls_keypair
2295 .sign_message(task_response_1_digest.as_ref());
2296 handle
2297 .process_signature(TaskSignature::new(
2298 task_index,
2299 task_response_1_digest,
2300 bls_sig_op_1.clone(),
2301 test_operator_1.operator_id,
2302 ))
2303 .await
2304 .unwrap();
2305
2306 let task_response_2_digest = hash(task_response);
2307 let bls_sig_op_2 = test_operator_2
2308 .bls_keypair
2309 .sign_message(task_response_2_digest.as_ref());
2310 handle
2311 .process_signature(TaskSignature::new(
2312 task_index,
2313 task_response_2_digest,
2314 bls_sig_op_2.clone(),
2315 test_operator_2.operator_id,
2316 ))
2317 .await
2318 .unwrap();
2319
2320 sleep(Duration::from_millis(500)).await;
2322 let task_response_3_digest = hash(task_response);
2323 let bls_sig_op_3 = test_operator_3
2324 .bls_keypair
2325 .sign_message(task_response_3_digest.as_ref());
2326 handle
2327 .process_signature(TaskSignature::new(
2328 task_index,
2329 task_response_3_digest,
2330 bls_sig_op_3.clone(),
2331 test_operator_3.operator_id,
2332 ))
2333 .await
2334 .unwrap();
2335
2336 let signers_apk_g2 = aggregate_g2_public_keys(&vec![
2337 test_operator_1.clone(),
2338 test_operator_2.clone(),
2339 test_operator_3.clone(),
2340 ]);
2341 let signers_agg_sig_g1 =
2342 aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
2343 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2344 test_operator_1,
2345 test_operator_2,
2346 test_operator_3,
2347 ])];
2348
2349 let expected_agg_service_response = BlsAggregationServiceResponse {
2350 task_index,
2351 task_response_digest: task_response_3_digest,
2352 non_signers_pub_keys_g1: vec![],
2353 quorum_apks_g1,
2354 signers_apk_g2,
2355 signers_agg_sig_g1,
2356 non_signer_quorum_bitmap_indices: vec![],
2357 quorum_apk_indices: vec![],
2358 total_stake_indices: vec![],
2359 non_signer_stake_indices: vec![],
2360 };
2361
2362 let response = aggregator_response.receive_aggregated_response().await;
2363
2364 let elapsed = start.elapsed();
2365 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2366 assert_eq!(task_index, response.unwrap().task_index);
2367 assert!(elapsed < time_to_expiry);
2368 assert!(elapsed >= window_duration);
2369 }
2370
2371 #[tokio::test]
2372 async fn test_if_quorum_has_been_reached_and_the_task_expires_during_window_the_response_is_sent(
2373 ) {
2374 let test_operator_1 = TestOperator {
2375 operator_id: U256::from(1).into(),
2376 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2377 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2378 };
2379 let test_operator_2 = TestOperator {
2380 operator_id: U256::from(2).into(),
2381 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2382 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2383 };
2384 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2385 let block_number = 1;
2386 let task_index = 0;
2387 let task_response = 123;
2388 let quorum_numbers: Vec<QuorumNum> = vec![0];
2389 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2390 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2391 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2392
2393 let time_to_expiry = Duration::from_secs(2);
2394 let window_duration = Duration::from_secs(10);
2395
2396 let start = Instant::now();
2397 let metadata = TaskMetadata::new(
2398 task_index,
2399 block_number,
2400 quorum_numbers,
2401 quorum_threshold_percentages,
2402 time_to_expiry,
2403 )
2404 .with_window_duration(window_duration);
2405 let (handle, mut aggregator_response) = bls_agg_service.start();
2406 handle.initialize_task(metadata).await.unwrap();
2407
2408 let task_response_1_digest = hash(task_response);
2409 let bls_sig_op_1 = test_operator_1
2410 .bls_keypair
2411 .sign_message(task_response_1_digest.as_ref());
2412 handle
2413 .process_signature(TaskSignature::new(
2414 task_index,
2415 task_response_1_digest,
2416 bls_sig_op_1.clone(),
2417 test_operator_1.operator_id,
2418 ))
2419 .await
2420 .unwrap();
2421
2422 let task_response_2_digest = hash(task_response);
2425 let bls_sig_op_2 = test_operator_2
2426 .bls_keypair
2427 .sign_message(task_response_2_digest.as_ref());
2428 handle
2429 .process_signature(TaskSignature::new(
2430 task_index,
2431 task_response_2_digest,
2432 bls_sig_op_2.clone(),
2433 test_operator_2.operator_id,
2434 ))
2435 .await
2436 .unwrap();
2437
2438 let signers_apk_g2 =
2439 aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
2440 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
2441 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2442 test_operator_1,
2443 test_operator_2,
2444 ])];
2445
2446 let expected_agg_service_response = BlsAggregationServiceResponse {
2447 task_index,
2448 task_response_digest: task_response_2_digest,
2449 non_signers_pub_keys_g1: vec![],
2450 quorum_apks_g1,
2451 signers_apk_g2,
2452 signers_agg_sig_g1,
2453 non_signer_quorum_bitmap_indices: vec![],
2454 quorum_apk_indices: vec![],
2455 total_stake_indices: vec![],
2456 non_signer_stake_indices: vec![],
2457 };
2458
2459 let response = aggregator_response.receive_aggregated_response().await;
2460
2461 let elapsed = start.elapsed();
2462 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2463 assert_eq!(task_index, response.unwrap().task_index);
2464 assert!(elapsed >= time_to_expiry);
2465 assert!(elapsed < window_duration);
2466 }
2467
2468 #[tokio::test]
2469 async fn test_if_window_duration_is_zero_no_signatures_are_aggregated_after_reaching_quorum() {
2470 let test_operator_1 = TestOperator {
2471 operator_id: U256::from(1).into(),
2472 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2473 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2474 };
2475 let test_operator_2 = TestOperator {
2476 operator_id: U256::from(2).into(),
2477 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2478 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2479 };
2480 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2481 let block_number = 1;
2482 let task_index = 0;
2483 let task_response = 123;
2484 let quorum_numbers: Vec<QuorumNum> = vec![0];
2485 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2486 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2487 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2488
2489 let time_to_expiry = Duration::from_secs(2);
2490 let window_duration = Duration::ZERO;
2491
2492 let start = Instant::now();
2493 let metadata = TaskMetadata::new(
2494 task_index,
2495 block_number,
2496 quorum_numbers,
2497 quorum_threshold_percentages,
2498 time_to_expiry,
2499 )
2500 .with_window_duration(window_duration);
2501 let (handle, mut aggregator_response) = bls_agg_service.start();
2502 handle.initialize_task(metadata).await.unwrap();
2503
2504 let task_response_1_digest = hash(task_response);
2505 let bls_sig_op_1 = test_operator_1
2506 .bls_keypair
2507 .sign_message(task_response_1_digest.as_ref());
2508 handle
2509 .process_signature(TaskSignature::new(
2510 task_index,
2511 task_response_1_digest,
2512 bls_sig_op_1.clone(),
2513 test_operator_1.operator_id,
2514 ))
2515 .await
2516 .unwrap();
2517
2518 sleep(Duration::from_millis(1)).await;
2520
2521 let task_response_2_digest = hash(task_response);
2522 let bls_sig_op_2 = test_operator_2
2523 .bls_keypair
2524 .sign_message(task_response_2_digest.as_ref());
2525 let process_signature_result = handle
2526 .process_signature(TaskSignature::new(
2527 task_index,
2528 task_response_2_digest,
2529 bls_sig_op_2,
2530 test_operator_2.operator_id,
2531 ))
2532 .await;
2533 assert_eq!(
2534 Err(BlsAggregationServiceError::SenderError),
2535 process_signature_result
2536 );
2537
2538 let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2539 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2540 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2541 test_operator_1,
2542 test_operator_2.clone(),
2543 ])];
2544
2545 let expected_agg_service_response = BlsAggregationServiceResponse {
2546 task_index,
2547 task_response_digest: task_response_1_digest,
2548 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2549 quorum_apks_g1,
2550 signers_apk_g2,
2551 signers_agg_sig_g1,
2552 non_signer_quorum_bitmap_indices: vec![],
2553 quorum_apk_indices: vec![],
2554 total_stake_indices: vec![],
2555 non_signer_stake_indices: vec![],
2556 };
2557
2558 let response = aggregator_response.receive_aggregated_response().await;
2559
2560 let elapsed = start.elapsed();
2561 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2562 assert_eq!(task_index, response.unwrap().task_index);
2563 assert!(elapsed < time_to_expiry);
2564 }
2565
2566 #[tokio::test]
2567 async fn test_no_signatures_are_aggregated_after_window() {
2568 let test_operator_1 = TestOperator {
2569 operator_id: U256::from(1).into(),
2570 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2571 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2572 };
2573 let test_operator_2 = TestOperator {
2574 operator_id: U256::from(2).into(),
2575 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2576 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2577 };
2578 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2579 let block_number = 1;
2580 let task_index = 0;
2581 let task_response = 123;
2582 let quorum_numbers: Vec<QuorumNum> = vec![0];
2583 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2584 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2585 let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);
2586
2587 let time_to_expiry = Duration::from_secs(5);
2588 let window_duration = Duration::from_secs(1);
2589
2590 let start = Instant::now();
2591 let metadata = TaskMetadata::new(
2592 task_index,
2593 block_number,
2594 quorum_numbers,
2595 quorum_threshold_percentages,
2596 time_to_expiry,
2597 )
2598 .with_window_duration(window_duration);
2599 let (handle, mut aggregator_response) = bls_agg_service.start();
2600 handle.initialize_task(metadata).await.unwrap();
2601
2602 let task_response_1_digest = hash(task_response);
2603 let bls_sig_op_1 = test_operator_1
2604 .bls_keypair
2605 .sign_message(task_response_1_digest.as_ref());
2606 handle
2607 .process_signature(TaskSignature::new(
2608 task_index,
2609 task_response_1_digest,
2610 bls_sig_op_1.clone(),
2611 test_operator_1.operator_id,
2612 ))
2613 .await
2614 .unwrap();
2615
2616 sleep(Duration::from_secs(2)).await;
2618
2619 let task_response_2_digest = hash(task_response);
2620 let bls_sig_op_2 = test_operator_2
2621 .bls_keypair
2622 .sign_message(task_response_2_digest.as_ref());
2623 let process_signature_result = handle
2624 .process_signature(TaskSignature::new(
2625 task_index,
2626 task_response_2_digest,
2627 bls_sig_op_2,
2628 test_operator_2.operator_id,
2629 ))
2630 .await;
2631 assert_eq!(
2632 Err(BlsAggregationServiceError::SenderError),
2633 process_signature_result
2634 );
2635
2636 let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2637 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2638 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2639 test_operator_1,
2640 test_operator_2.clone(),
2641 ])];
2642
2643 let expected_agg_service_response = BlsAggregationServiceResponse {
2644 task_index,
2645 task_response_digest: task_response_1_digest,
2646 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2647 quorum_apks_g1,
2648 signers_apk_g2,
2649 signers_agg_sig_g1,
2650 non_signer_quorum_bitmap_indices: vec![],
2651 quorum_apk_indices: vec![],
2652 total_stake_indices: vec![],
2653 non_signer_stake_indices: vec![],
2654 };
2655
2656 let response = aggregator_response.receive_aggregated_response().await;
2657
2658 let elapsed = start.elapsed();
2659 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2660 assert_eq!(task_index, response.unwrap().task_index);
2661 assert!(elapsed < time_to_expiry);
2662 }
2663}