1use super::bls_aggregation_service_error::BlsAggregationServiceError;
2use super::bls_aggregation_service_response::BlsAggregationServiceResponse;
3use alloy::primitives::{FixedBytes, Uint, U256};
4use ark_bn254::{G1Affine, G2Affine};
5use ark_ec::AffineRepr;
6use eigen_crypto_bls::{BlsG1Point, BlsG2Point, Signature};
7use eigen_crypto_bn254::utils::verify_message;
8use eigen_logging::logger::SharedLogger;
9use eigen_services_avsregistry::AvsRegistryService;
10use eigen_types::avs_state::OperatorAvsState;
11use eigen_types::{
12 avs::{SignatureVerificationError, TaskIndex, TaskResponseDigest},
13 operator::{QuorumThresholdPercentage, QuorumThresholdPercentages},
14};
15use std::collections::HashMap;
16use tokio::{
17 sync::{
18 mpsc::{self, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 time::Duration,
22};
23
24#[derive(Debug, Clone)]
26pub struct AggregatedOperators {
27 signers_apk_g2: BlsG2Point,
28 signers_agg_sig_g1: Signature,
29 signers_total_stake_per_quorum: HashMap<u8, U256>,
30 pub signers_operator_ids_set: HashMap<FixedBytes<32>, bool>,
31}
32
33#[derive(Clone)]
35pub struct TaskMetadata {
36 task_index: TaskIndex,
38 task_created_block: u64,
40 quorum_numbers: Vec<u8>,
42 quorum_threshold_percentages: QuorumThresholdPercentages,
44 time_to_expiry: Duration,
46 window_duration: Duration,
48}
49
50impl TaskMetadata {
51 pub fn new(
68 task_index: TaskIndex,
69 task_created_block: u64,
70 quorum_numbers: Vec<u8>,
71 quorum_threshold_percentages: QuorumThresholdPercentages,
72 time_to_expiry: Duration,
73 ) -> Self {
74 Self {
75 task_index,
76 task_created_block,
77 quorum_numbers,
78 quorum_threshold_percentages,
79 time_to_expiry,
80 window_duration: Duration::ZERO,
81 }
82 }
83
84 pub fn with_window_duration(mut self, window_duration: Duration) -> Self {
93 self.window_duration = window_duration;
94 self
95 }
96}
97
98#[derive(Clone)]
100pub struct TaskSignature {
101 task_index: TaskIndex,
103 task_response_digest: TaskResponseDigest,
105 bls_signature: Signature,
107 operator_id: FixedBytes<32>,
109}
110
111impl TaskSignature {
112 pub fn new(
124 task_index: TaskIndex,
125 task_response_digest: TaskResponseDigest,
126 bls_signature: Signature,
127 operator_id: FixedBytes<32>,
128 ) -> Self {
129 Self {
130 task_index,
131 task_response_digest,
132 bls_signature,
133 operator_id,
134 }
135 }
136}
137
138pub enum AggregationMessage {
140 InitializeTask(
141 TaskMetadata,
142 oneshot::Sender<Result<(), BlsAggregationServiceError>>,
143 ),
144 ProcessSignature(
145 TaskSignature,
146 oneshot::Sender<Result<(), BlsAggregationServiceError>>,
147 ),
148}
149
150#[derive(Debug, Clone)]
152pub struct ServiceHandle {
153 msg_sender: UnboundedSender<AggregationMessage>,
155}
156
157impl ServiceHandle {
158 pub async fn initialize_task(
168 &self,
169 metadata: TaskMetadata,
170 ) -> Result<(), BlsAggregationServiceError> {
171 let (tx, rx) = oneshot::channel();
172 self.msg_sender
173 .send(AggregationMessage::InitializeTask(metadata, tx))
174 .map_err(|_| BlsAggregationServiceError::SenderError)?;
175
176 rx.await
177 .map_err(|_| BlsAggregationServiceError::ReceiverError)?
178 }
179
180 pub async fn process_signature(
192 &self,
193 task_signature: TaskSignature,
194 ) -> Result<(), BlsAggregationServiceError> {
195 let (tx, rx) = oneshot::channel();
196 self.msg_sender
197 .send(AggregationMessage::ProcessSignature(task_signature, tx))
198 .map_err(|_| BlsAggregationServiceError::SenderError)?;
199
200 rx.await
201 .map_err(|_| BlsAggregationServiceError::ReceiverError)?
202 }
203}
204
205#[derive(Debug)]
207pub struct AggregateReceiver {
208 aggregate_receiver:
210 UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
211}
212
213impl AggregateReceiver {
214 pub async fn receive_aggregated_response(
220 &mut self,
221 ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
222 self.aggregate_receiver
223 .recv()
224 .await
225 .ok_or(BlsAggregationServiceError::ReceiverError)?
226 }
227}
228
229#[derive(Debug)]
231pub struct BlsAggregatorService<A: AvsRegistryService>
232where
233 A: Clone,
234{
235 logger: SharedLogger,
236 avs_registry_service: A,
237}
238
239#[derive(Debug)]
241struct SignedTaskResponseDigest {
242 task_response_digest: TaskResponseDigest,
243
244 bls_signature: Signature,
245
246 operator_id: FixedBytes<32>,
247
248 result_channel: oneshot::Sender<Result<(), BlsAggregationServiceError>>,
249}
250
251impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService<A> {
252 pub fn new(avs_registry_service: A, logger: SharedLogger) -> Self {
261 Self {
262 logger,
263 avs_registry_service,
264 }
265 }
266
267 pub fn start(self) -> (ServiceHandle, AggregateReceiver) {
273 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
274 let (agg_tx, agg_rx) = mpsc::unbounded_channel();
275
276 tokio::spawn(async move {
277 self.run(msg_rx, agg_tx).await;
278 });
279
280 let service_handler = ServiceHandle { msg_sender: msg_tx };
282 let aggregate_receiver = AggregateReceiver {
283 aggregate_receiver: agg_rx,
284 };
285
286 (service_handler, aggregate_receiver)
287 }
288
289 async fn run(
303 self,
304 mut msg_receiver: UnboundedReceiver<AggregationMessage>,
305 aggregate_sender: UnboundedSender<
306 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
307 >,
308 ) {
309 let mut task_channels: HashMap<TaskIndex, UnboundedSender<SignedTaskResponseDigest>> =
310 HashMap::new();
311
312 while let Some(message) = msg_receiver.recv().await {
313 match message {
314 AggregationMessage::InitializeTask(metadata, result_sender) => {
315 let task_index = metadata.task_index;
316 if task_channels.contains_key(&task_index) {
317 result_sender
319 .send(Err(BlsAggregationServiceError::DuplicateTaskIndex))
320 .ok();
321 continue;
322 }
323
324 let (signature_tx, signature_rx) =
326 mpsc::unbounded_channel::<SignedTaskResponseDigest>();
327 task_channels.insert(task_index, signature_tx);
328
329 let avs_registry_service = self.avs_registry_service.clone();
330 let aggregated_response_sender = aggregate_sender.clone();
331 let logger = self.logger.clone();
332
333 tokio::spawn(async move {
334 let _ = Self::single_task_aggregator(
335 avs_registry_service,
336 metadata,
337 aggregated_response_sender,
338 signature_rx,
339 logger,
340 )
341 .await
342 .inspect_err(|err| {
343 println!("Error with single_task_aggregator: {:?}", err);
344 });
345 });
346
347 let _ = result_sender.send(Ok(()));
348 }
349 AggregationMessage::ProcessSignature(task_signature, result_sender) => {
350 if let Some(sig_sender) = task_channels.get_mut(&task_signature.task_index) {
351 let signed_digest = SignedTaskResponseDigest {
353 task_response_digest: task_signature.task_response_digest,
354 bls_signature: task_signature.bls_signature,
355 operator_id: task_signature.operator_id,
356 result_channel: result_sender,
357 };
358
359 if let Err(send_error) = sig_sender.send(signed_digest) {
360 let _ = send_error
361 .0
362 .result_channel
363 .send(Err(BlsAggregationServiceError::SenderError));
364 }
365 } else {
366 result_sender
367 .send(Err(BlsAggregationServiceError::TaskNotFound))
368 .ok();
369 }
370 }
371 }
372 }
373 }
374
375 async fn single_task_aggregator(
389 avs_registry_service: A,
390 metadata: TaskMetadata,
391 aggregated_response_sender: UnboundedSender<
392 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
393 >,
394 signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
395 logger: SharedLogger,
396 ) -> Result<(), BlsAggregationServiceError> {
397 let quorum_threshold_percentage_map: HashMap<u8, u8> = metadata
398 .quorum_numbers
399 .iter()
400 .enumerate()
401 .map(|(i, quorum_number)| (*quorum_number, metadata.quorum_threshold_percentages[i]))
402 .collect();
403 let operator_state_avs = avs_registry_service
404 .get_operators_avs_state_at_block(metadata.task_created_block, &metadata.quorum_numbers)
405 .await
406 .map_err(|_| BlsAggregationServiceError::RegistryError)?;
407 let quorums_avs_state = avs_registry_service
408 .get_quorums_avs_state_at_block(&metadata.quorum_numbers, metadata.task_created_block)
409 .await
410 .map_err(|_| BlsAggregationServiceError::RegistryError)?;
411 let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state
412 .iter()
413 .map(|(k, v)| (*k, v.total_stake))
414 .collect();
415
416 let quorum_apks_g1: Vec<BlsG1Point> = metadata
417 .quorum_numbers
418 .iter()
419 .filter_map(|quorum_num| quorums_avs_state.get(quorum_num))
420 .map(|avs_state| avs_state.agg_pub_key_g1.clone())
421 .collect();
422
423 Self::loop_task_aggregator(
424 avs_registry_service,
425 metadata.task_index,
426 metadata.task_created_block,
427 metadata.time_to_expiry,
428 aggregated_response_sender,
429 signatures_rx,
430 operator_state_avs,
431 total_stake_per_quorum,
432 quorum_threshold_percentage_map,
433 quorum_apks_g1,
434 metadata.quorum_numbers,
435 metadata.window_duration,
436 logger,
437 )
438 .await
439 }
440
441 #[allow(clippy::too_many_arguments)]
442 async fn loop_task_aggregator(
443 avs_registry_service: A,
444 task_index: TaskIndex,
445 task_created_block: u64,
446 time_to_expiry: Duration,
447 aggregated_response_sender: UnboundedSender<
448 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
449 >,
450 mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
451 operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
452 total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
453 quorum_threshold_percentage_map: HashMap<u8, u8>,
454 quorum_apks_g1: Vec<BlsG1Point>,
455 quorum_nums: Vec<u8>,
456 window_duration: Duration,
457 logger: SharedLogger,
458 ) -> Result<(), BlsAggregationServiceError> {
459 let mut aggregated_operators: HashMap<FixedBytes<32>, AggregatedOperators> = HashMap::new();
460 let mut open_window = false;
461 let mut current_aggregated_response: Option<BlsAggregationServiceResponse> = None;
462 let (window_tx, mut window_rx) = tokio::sync::mpsc::unbounded_channel::<bool>();
463 let task_expired_timer = tokio::time::sleep(time_to_expiry);
464 tokio::pin!(task_expired_timer);
465
466 loop {
467 tokio::select! {
468 _ = &mut task_expired_timer => {
469 Self::handle_task_expired(
471 &logger,
472 &aggregated_response_sender,
473 task_index,
474 open_window,
475 ¤t_aggregated_response,
476 )?;
477 return Ok(());
478 },
479 _ = window_rx.recv() => {
480 Self::handle_window_finished(
482 &logger,
483 &aggregated_response_sender,
484 task_index,
485 ¤t_aggregated_response,
486 )?;
487 return Ok(());
488 },
489 signed_task_digest = signatures_rx.recv() => {
490 Self::handle_new_signature(
492 &logger,
493 &avs_registry_service,
494 &mut aggregated_operators,
495 &mut open_window,
496 &mut current_aggregated_response,
497 &window_tx,
498 task_index,
499 task_created_block,
500 &operator_state_avs,
501 &total_stake_per_quorum,
502 &quorum_threshold_percentage_map,
503 &quorum_apks_g1,
504 &quorum_nums,
505 window_duration,
506 signed_task_digest,
507 ).await?;
508 }
509 }
510 }
511 }
512
513 #[allow(clippy::too_many_arguments)]
533 async fn handle_new_signature(
534 logger: &SharedLogger,
535 avs_registry_service: &A,
536 aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
537 open_window: &mut bool,
538 current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
539 window_tx: &UnboundedSender<bool>,
540 task_index: TaskIndex,
541 task_created_block: u64,
542 operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
543 total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
544 quorum_threshold_percentage_map: &HashMap<u8, u8>,
545 quorum_apks_g1: &[BlsG1Point],
546 quorum_nums: &[u8],
547 window_duration: Duration,
548 signed_task_digest: Option<SignedTaskResponseDigest>,
549 ) -> Result<(), BlsAggregationServiceError> {
550 logger.debug(
551 &format!("New signature received for task index: {}", task_index),
552 "eigen-services-blsaggregation.bls_agg.handle_new_signature",
553 );
554
555 let signed_digest =
556 signed_task_digest.ok_or(BlsAggregationServiceError::SignaturesChannelClosed)?;
557
558 if Self::is_duplicate_signature(aggregated_operators, &signed_digest) {
560 signed_digest
561 .result_channel
562 .send(Err(BlsAggregationServiceError::SignatureVerificationError(
563 SignatureVerificationError::DuplicateSignature,
564 )))
565 .map_err(|_| BlsAggregationServiceError::SenderError)?;
566 return Ok(());
567 }
568
569 let verification_result = verify_signature(
571 task_index,
572 &signed_digest,
573 operator_state_avs,
574 logger.clone(),
575 )
576 .await
577 .map_err(BlsAggregationServiceError::SignatureVerificationError);
578
579 let verification_has_error = verification_result.is_err();
580
581 signed_digest
583 .result_channel
584 .send(verification_result)
585 .map_err(|_| BlsAggregationServiceError::SenderError)?;
586
587 if verification_has_error {
589 return Ok(());
590 }
591
592 let operator_state = operator_state_avs.get(&signed_digest.operator_id).unwrap();
593
594 let updated_aggregated = update_aggregated_operators(
596 aggregated_operators,
597 operator_state,
598 signed_digest.task_response_digest,
599 signed_digest.bls_signature,
600 signed_digest.operator_id,
601 logger.clone(),
602 );
603 aggregated_operators.insert(
604 signed_digest.task_response_digest,
605 updated_aggregated.clone(),
606 );
607
608 if !Self::check_if_stake_thresholds_met(
610 &updated_aggregated.signers_total_stake_per_quorum,
611 total_stake_per_quorum,
612 quorum_threshold_percentage_map,
613 ) {
614 return Ok(());
615 }
616
617 logger.debug(
618 &format!("Signature threshold is met for task index: {}", task_index),
619 "eigen-services-blsaggregation.bls_agg.handle_new_signature",
620 );
621
622 if !*open_window {
624 *open_window = true;
625 Self::start_window(window_tx, window_duration, task_index, logger.clone());
626 }
627
628 *current_aggregated_response = Some(
629 Self::build_aggregated_response(
630 task_index,
631 task_created_block,
632 signed_digest.task_response_digest,
633 operator_state_avs,
634 updated_aggregated,
635 avs_registry_service,
636 quorum_apks_g1,
637 quorum_nums,
638 logger.clone(),
639 )
640 .await?,
641 );
642
643 Ok(())
644 }
645
646 fn handle_task_expired(
657 logger: &SharedLogger,
658 aggregated_response_sender: &UnboundedSender<
659 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
660 >,
661 task_index: TaskIndex,
662 open_window: bool,
663 current_aggregated_response: &Option<BlsAggregationServiceResponse>,
664 ) -> Result<(), BlsAggregationServiceError> {
665 if open_window {
666 logger.debug(
667 &format!(
668 "task_expired_timer while in the waiting window for task index: {}",
669 task_index
670 ),
671 "eigen-services-blsaggregation.bls_agg.handle_task_expired",
672 );
673 aggregated_response_sender
674 .send(Ok(current_aggregated_response.clone().unwrap()))
675 .map_err(|_| BlsAggregationServiceError::SenderError)?;
676 } else {
677 logger.debug(
678 &format!(
679 "task_expired_timer NOT in the waiting window for task index: {}",
680 task_index
681 ),
682 "eigen-services-blsaggregation.bls_agg.handle_task_expired",
683 );
684
685 let _ = aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired));
686 }
687 Ok(())
688 }
689
690 fn handle_window_finished(
700 logger: &SharedLogger,
701 aggregated_response_sender: &UnboundedSender<
702 Result<BlsAggregationServiceResponse, BlsAggregationServiceError>,
703 >,
704 task_index: TaskIndex,
705 current_aggregated_response: &Option<BlsAggregationServiceResponse>,
706 ) -> Result<(), BlsAggregationServiceError> {
707 logger.debug(
708 &format!(
709 "Window finished. Send aggregated response for task index: {}",
710 task_index
711 ),
712 "eigen-services-blsaggregation.bls_agg.handle_window_finished",
713 );
714
715 aggregated_response_sender
716 .send(Ok(current_aggregated_response.clone().unwrap()))
717 .map_err(|_| BlsAggregationServiceError::SenderError)?;
718 Ok(())
719 }
720
721 #[allow(clippy::too_many_arguments)]
739 async fn build_aggregated_response(
740 task_index: TaskIndex,
741 task_created_block: u64,
742 task_response_digest: FixedBytes<32>,
743 operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
744 digest_aggregated_operators: AggregatedOperators,
745 avs_registry_service: &A,
746 quorum_apks_g1: &[BlsG1Point],
747 quorum_nums: &[u8],
748 logger: SharedLogger,
749 ) -> Result<BlsAggregationServiceResponse, BlsAggregationServiceError> {
750 logger.debug(
751 &format!("Build aggregated response for task index: {}", task_index),
752 "eigen-services-blsaggregation.bls_agg.build_aggregated_response",
753 );
754
755 let mut non_signers_operators_ids: Vec<FixedBytes<32>> = operator_state_avs
756 .keys()
757 .filter(|operator_id| {
758 !digest_aggregated_operators
759 .signers_operator_ids_set
760 .contains_key(*operator_id)
761 })
762 .cloned()
763 .collect();
764
765 non_signers_operators_ids.sort();
766
767 let non_signers_pub_keys_g1: Vec<BlsG1Point> = non_signers_operators_ids
768 .iter()
769 .filter_map(|operator_id| operator_state_avs.get(operator_id))
770 .filter_map(|operator_avs_state| operator_avs_state.operator_info.pub_keys.clone())
771 .map(|pub_keys| pub_keys.g1_pub_key)
772 .collect();
773
774 let indices = avs_registry_service
775 .get_check_signatures_indices(
776 task_created_block,
777 quorum_nums.into(),
778 non_signers_operators_ids,
779 )
780 .await
781 .map_err(|_err| BlsAggregationServiceError::RegistryError)?;
782
783 Ok(BlsAggregationServiceResponse {
784 task_index,
785 task_response_digest,
786 non_signers_pub_keys_g1,
787 quorum_apks_g1: quorum_apks_g1.into(),
788 signers_apk_g2: digest_aggregated_operators.signers_apk_g2,
789 signers_agg_sig_g1: digest_aggregated_operators.signers_agg_sig_g1,
790 non_signer_quorum_bitmap_indices: indices.clone().nonSignerQuorumBitmapIndices,
791 quorum_apk_indices: indices.quorumApkIndices,
792 total_stake_indices: indices.totalStakeIndices,
793 non_signer_stake_indices: indices.nonSignerStakeIndices,
794 })
795 }
796
797 fn check_if_stake_thresholds_met(
810 signed_stake_per_quorum: &HashMap<u8, U256>,
811 total_stake_per_quorum: &HashMap<u8, U256>,
812 quorum_threshold_percentages_map: &HashMap<u8, QuorumThresholdPercentage>,
813 ) -> bool {
814 for (quorum_num, quorum_threshold_percentage) in quorum_threshold_percentages_map {
815 let (Some(signed_stake_by_quorum), Some(total_stake_by_quorum)) = (
816 signed_stake_per_quorum.get(quorum_num),
817 total_stake_per_quorum.get(quorum_num),
818 ) else {
819 return false;
820 };
821
822 let signed_stake = signed_stake_by_quorum * U256::from(100);
823 let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage);
824
825 if signed_stake < threshold_stake {
826 return false;
827 }
828 }
829 true
830 }
831
832 fn is_duplicate_signature(
843 aggregated_operators: &HashMap<FixedBytes<32>, AggregatedOperators>,
844 signed_digest: &SignedTaskResponseDigest,
845 ) -> bool {
846 aggregated_operators
847 .get(&signed_digest.task_response_digest)
848 .map(|ops| {
849 ops.signers_operator_ids_set
850 .contains_key(&signed_digest.operator_id)
851 })
852 .unwrap_or(false)
853 }
854
855 fn start_window(
864 window_tx: &UnboundedSender<bool>,
865 window_duration: Duration,
866 task_index: TaskIndex,
867 logger: SharedLogger,
868 ) {
869 let sender = window_tx.clone();
870 logger.debug(
871 &format!(
872 "Create window to wait for new signatures for task index: {}",
873 task_index
874 ),
875 "eigen-services-blsaggregation.bls_agg.start_window",
876 );
877 tokio::spawn(async move {
878 tokio::time::sleep(window_duration).await;
879 let _ = sender.send(true);
880 });
881 }
882}
883
884async fn verify_signature(
902 task_index: TaskIndex,
903 signed_task_response_digest: &SignedTaskResponseDigest,
904 operator_avs_state: &HashMap<FixedBytes<32>, OperatorAvsState>,
905 logger: SharedLogger,
906) -> Result<(), SignatureVerificationError> {
907 let Some(operator_state) = operator_avs_state.get(&signed_task_response_digest.operator_id)
908 else {
909 logger.error(
910 &format!("Operator Not Found for task index: {}", task_index),
911 "eigen-services-blsaggregation.bls_agg.verify_signature",
912 );
913 return Err(SignatureVerificationError::OperatorNotFound);
914 };
915
916 let Some(pub_keys) = &operator_state.operator_info.pub_keys else {
917 logger.error(
918 &format!(
919 "Operator Public Key Not Found for task index: {}",
920 task_index
921 ),
922 "eigen-services-blsaggregation.bls_agg.verify_signature",
923 );
924 return Err(SignatureVerificationError::OperatorPublicKeyNotFound);
925 };
926
927 let message = signed_task_response_digest
928 .task_response_digest
929 .as_slice()
930 .try_into()
931 .map_err(|_| SignatureVerificationError::IncorrectSignature)?;
932
933 verify_message(
934 pub_keys.g2_pub_key.g2(),
935 message,
936 signed_task_response_digest.bls_signature.g1_point().g1(),
937 )
938 .then_some(())
939 .ok_or(SignatureVerificationError::IncorrectSignature)
940 .inspect(|_| {
941 logger.debug(
942 &format!(
943 "Signature verification successful for task index: {}",
944 task_index
945 ),
946 "eigen-services-blsaggregation.bls_agg.verify_signature",
947 );
948 })
949 .inspect_err(|_| {
950 logger.error(
951 &format!(
952 "Signature verification failed for task index: {}",
953 task_index
954 ),
955 "eigen-services-blsaggregation.bls_agg.verify_signature",
956 );
957 })
958}
959
960fn update_aggregated_operators(
975 aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
976 operator_state: &OperatorAvsState,
977 task_response_digest: FixedBytes<32>,
978 bls_signature: Signature,
979 operator_id: FixedBytes<32>,
980 logger: SharedLogger,
981) -> AggregatedOperators {
982 logger.debug(
983 "Update aggregated operators",
984 "eigen-services-blsaggregation.bls_agg.update_aggregated_operators",
985 );
986
987 let bls_signature_g1_point = bls_signature.g1_point().g1();
988
989 if let Some(existing) = aggregated_operators.get_mut(&task_response_digest) {
990 let updated = aggregate_new_operator(
992 existing,
993 operator_state.clone(),
994 operator_id,
995 bls_signature_g1_point,
996 logger,
997 );
998 updated.clone()
999 } else {
1000 let operator_g2_pubkey = operator_state
1002 .operator_info
1003 .pub_keys
1004 .clone()
1005 .unwrap()
1006 .g2_pub_key
1007 .g2();
1008 let mut signers_apk_g2 = BlsG2Point::new(G2Affine::zero());
1009 let mut signers_agg_sig_g1 = Signature::new(G1Affine::zero());
1010 for _ in 0..operator_state.stake_per_quorum.len() {
1011 signers_apk_g2 = BlsG2Point::new((signers_apk_g2.g2() + operator_g2_pubkey).into());
1012 signers_agg_sig_g1 = Signature::new(
1013 (signers_agg_sig_g1.g1_point().g1() + bls_signature_g1_point).into(),
1014 );
1015 }
1016 AggregatedOperators {
1017 signers_apk_g2,
1018 signers_agg_sig_g1,
1019 signers_operator_ids_set: HashMap::from([(operator_state.operator_id, true)]),
1020 signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(),
1021 }
1022 }
1023}
1024
1025fn aggregate_new_operator(
1038 aggregated_operators: &mut AggregatedOperators,
1039 operator_state: OperatorAvsState,
1040 operator_id: FixedBytes<32>,
1041 signature_g1_point: G1Affine,
1042 logger: SharedLogger,
1043) -> &mut AggregatedOperators {
1044 let operator_g2_pubkey = operator_state
1045 .operator_info
1046 .pub_keys
1047 .clone()
1048 .unwrap()
1049 .g2_pub_key
1050 .g2();
1051 aggregated_operators
1052 .signers_operator_ids_set
1053 .insert(operator_id, true);
1054
1055 logger.debug(
1056 &format!(
1057 "operator {} inserted in signers_operator_ids_set",
1058 operator_id
1059 ),
1060 "eigen-services-blsaggregation.bls_agg.aggregate_new_operator",
1061 );
1062
1063 for (quorum_num, stake) in operator_state.stake_per_quorum.iter() {
1064 aggregated_operators.signers_agg_sig_g1 = Signature::new(
1066 (aggregated_operators.signers_agg_sig_g1.g1_point().g1() + signature_g1_point).into(),
1067 );
1068 aggregated_operators.signers_apk_g2 =
1069 BlsG2Point::new((aggregated_operators.signers_apk_g2.g2() + operator_g2_pubkey).into());
1070 aggregated_operators
1071 .signers_total_stake_per_quorum
1072 .entry(*quorum_num)
1073 .and_modify(|v| *v += stake)
1074 .or_insert(*stake);
1075 }
1076 aggregated_operators
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService};
1082 use crate::bls_agg::{TaskMetadata, TaskSignature};
1083 use alloy::primitives::{B256, U256};
1084 use eigen_crypto_bls::{BlsG1Point, BlsG2Point, BlsKeyPair, Signature};
1085 use eigen_logging::get_test_logger;
1086 use eigen_services_avsregistry::fake_avs_registry_service::FakeAvsRegistryService;
1087 use eigen_types::avs::SignatureVerificationError::{DuplicateSignature, IncorrectSignature};
1088 use eigen_types::operator::{QuorumNum, QuorumThresholdPercentages};
1089 use eigen_types::{avs::TaskIndex, test::TestOperator};
1090 use sha2::{Digest, Sha256};
1091 use std::collections::HashMap;
1092 use std::time::Duration;
1093 use std::vec;
1094 use tokio::time::{sleep, Instant};
1095
1096 const PRIVATE_KEY_1: &str =
1097 "13710126902690889134622698668747132666439281256983827313388062967626731803599";
1098 const PRIVATE_KEY_2: &str =
1099 "14610126902690889134622698668747132666439281256983827313388062967626731803500";
1100 const PRIVATE_KEY_3: &str =
1101 "15610126902690889134622698668747132666439281256983827313388062967626731803501";
1102
1103 fn hash(task_response: u64) -> B256 {
1104 let mut hasher = Sha256::new();
1105 hasher.update(task_response.to_be_bytes());
1106 B256::from_slice(hasher.finalize().as_ref())
1107 }
1108
1109 fn aggregate_g1_public_keys(operators: &[TestOperator]) -> BlsG1Point {
1110 operators
1111 .iter()
1112 .map(|op| op.bls_keypair.public_key().g1())
1113 .reduce(|a, b| (a + b).into())
1114 .map(BlsG1Point::new)
1115 .unwrap()
1116 }
1117
1118 fn aggregate_g2_public_keys(operators: &[TestOperator]) -> BlsG2Point {
1119 operators
1120 .iter()
1121 .map(|op| op.bls_keypair.public_key_g2().g2())
1122 .reduce(|a, b| (a + b).into())
1123 .map(BlsG2Point::new)
1124 .unwrap()
1125 }
1126
1127 fn aggregate_g1_signatures(signatures: &[Signature]) -> Signature {
1128 let agg = signatures
1129 .iter()
1130 .map(|s| s.g1_point().g1())
1131 .reduce(|a, b| (a + b).into())
1132 .unwrap();
1133 Signature::new(agg)
1134 }
1135
1136 #[tokio::test]
1137 async fn test_1_quorum_1_operator_1_correct_signature() {
1138 let test_operator_1 = TestOperator {
1139 operator_id: U256::from(1).into(),
1140 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1141 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1142 };
1143
1144 let block_number = 1;
1145 let task_index: TaskIndex = 0;
1146 let quorum_numbers = vec![0];
1147 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1148 let time_to_expiry = Duration::from_secs(1);
1149 let task_response = 123; let task_response_digest = hash(task_response);
1152 let bls_signature = test_operator_1
1153 .bls_keypair
1154 .sign_message(task_response_digest.as_ref());
1155 let fake_avs_registry_service =
1156 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1157 let bls_agg_service =
1158 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1159 let metadata = TaskMetadata::new(
1160 task_index,
1161 block_number,
1162 quorum_numbers,
1163 quorum_threshold_percentages,
1164 time_to_expiry,
1165 );
1166 let (handle, mut aggregator_response) = bls_agg_service.start();
1167 handle.initialize_task(metadata).await.unwrap();
1168
1169 handle
1170 .process_signature(TaskSignature::new(
1171 task_index,
1172 task_response_digest,
1173 bls_signature,
1174 test_operator_1.operator_id,
1175 ))
1176 .await
1177 .unwrap();
1178
1179 let expected_agg_service_response = BlsAggregationServiceResponse {
1180 task_index,
1181 task_response_digest,
1182 non_signers_pub_keys_g1: vec![],
1183 quorum_apks_g1: vec![test_operator_1.bls_keypair.public_key()],
1184 signers_apk_g2: test_operator_1.bls_keypair.public_key_g2(),
1185 signers_agg_sig_g1: test_operator_1
1186 .bls_keypair
1187 .sign_message(task_response_digest.as_ref()),
1188 non_signer_quorum_bitmap_indices: vec![],
1189 quorum_apk_indices: vec![],
1190 total_stake_indices: vec![],
1191 non_signer_stake_indices: vec![],
1192 };
1193
1194 let response = aggregator_response.receive_aggregated_response().await;
1195
1196 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1197 assert_eq!(task_index, response.unwrap().task_index);
1198 }
1199
1200 #[tokio::test]
1201 async fn test_1_quorum_2_operator_2_duplicated_signatures() {
1202 let test_operator_1 = TestOperator {
1203 operator_id: U256::from(1).into(),
1204 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1205 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1206 };
1207 let test_operator_2 = TestOperator {
1208 operator_id: U256::from(2).into(),
1209 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1210 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1211 };
1212 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1213 let block_number = 1;
1214 let task_index: TaskIndex = 0;
1215 let quorum_numbers = vec![0];
1216 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1217 let time_to_expiry = Duration::from_secs(1);
1218 let task_response = 123; let task_response_digest = hash(task_response);
1220
1221 let fake_avs_registry_service =
1222 FakeAvsRegistryService::new(block_number, test_operators.clone());
1223 let bls_agg_service =
1224 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1225 let metadata = TaskMetadata::new(
1226 task_index,
1227 block_number,
1228 quorum_numbers,
1229 quorum_threshold_percentages,
1230 time_to_expiry,
1231 );
1232 let (handle, mut aggregator_response) = bls_agg_service.start();
1233 handle.initialize_task(metadata).await.unwrap();
1234 let bls_signature_1 = test_operator_1
1235 .bls_keypair
1236 .sign_message(task_response_digest.as_ref());
1237 handle
1238 .process_signature(TaskSignature::new(
1239 task_index,
1240 task_response_digest,
1241 bls_signature_1.clone(),
1242 test_operator_1.operator_id,
1243 ))
1244 .await
1245 .unwrap();
1246
1247 let second_signature_processing_result = handle
1248 .process_signature(TaskSignature::new(
1249 task_index,
1250 task_response_digest,
1251 bls_signature_1.clone(),
1252 test_operator_1.operator_id,
1253 ))
1254 .await;
1255
1256 assert_eq!(
1257 second_signature_processing_result,
1258 Err(BlsAggregationServiceError::SignatureVerificationError(
1259 DuplicateSignature
1260 ))
1261 );
1262
1263 let bls_signature_2 = test_operator_2
1264 .bls_keypair
1265 .sign_message(task_response_digest.as_ref());
1266
1267 handle
1268 .process_signature(TaskSignature::new(
1269 task_index,
1270 task_response_digest,
1271 bls_signature_2.clone(),
1272 test_operator_2.operator_id,
1273 ))
1274 .await
1275 .unwrap();
1276
1277 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1278 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1279 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_signature_1, bls_signature_2]);
1280 let expected_agg_service_response = BlsAggregationServiceResponse {
1281 task_index,
1282 task_response_digest,
1283 non_signers_pub_keys_g1: vec![],
1284 quorum_apks_g1: vec![quorum_apks_g1],
1285 signers_apk_g2,
1286 signers_agg_sig_g1,
1287 non_signer_quorum_bitmap_indices: vec![],
1288 quorum_apk_indices: vec![],
1289 total_stake_indices: vec![],
1290 non_signer_stake_indices: vec![],
1291 };
1292
1293 let response = aggregator_response.receive_aggregated_response().await;
1294
1295 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1296 assert_eq!(task_index, response.unwrap().task_index);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_1_quorum_3_operator_3_correct_signatures() {
1301 let test_operator_1 = TestOperator {
1302 operator_id: U256::from(1).into(),
1303 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1304 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1305 };
1306 let test_operator_2 = TestOperator {
1307 operator_id: U256::from(2).into(),
1308 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1309 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1310 };
1311 let test_operator_3 = TestOperator {
1312 operator_id: U256::from(3).into(),
1313 stake_per_quorum: HashMap::from([(0u8, U256::from(300)), (1u8, U256::from(100))]),
1314 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1315 };
1316 let test_operators = vec![
1317 test_operator_1.clone(),
1318 test_operator_2.clone(),
1319 test_operator_3.clone(),
1320 ];
1321
1322 let block_number = 1;
1323 let task_index = 0;
1324 let quorum_numbers: Vec<QuorumNum> = vec![0];
1325 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
1326 let time_to_expiry = Duration::from_secs(1);
1327 let task_response = 123; let task_response_digest = hash(task_response);
1329
1330 let fake_avs_registry_service =
1331 FakeAvsRegistryService::new(block_number, test_operators.clone());
1332 let bls_agg_service =
1333 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1334
1335 let metadata = TaskMetadata::new(
1336 task_index,
1337 block_number,
1338 quorum_numbers,
1339 quorum_threshold_percentages,
1340 time_to_expiry,
1341 );
1342 let (handle, mut aggregator_response) = bls_agg_service.start();
1343 handle.initialize_task(metadata).await.unwrap();
1344
1345 let bls_sig_op_1 = test_operator_1
1346 .bls_keypair
1347 .sign_message(task_response_digest.as_ref());
1348 handle
1349 .process_signature(TaskSignature::new(
1350 task_index,
1351 task_response_digest,
1352 bls_sig_op_1.clone(),
1353 test_operator_1.operator_id,
1354 ))
1355 .await
1356 .unwrap();
1357
1358 let bls_sig_op_2 = test_operator_2
1359 .bls_keypair
1360 .sign_message(task_response_digest.as_ref());
1361 handle
1362 .process_signature(TaskSignature::new(
1363 task_index,
1364 task_response_digest,
1365 bls_sig_op_2.clone(),
1366 test_operator_2.operator_id,
1367 ))
1368 .await
1369 .unwrap();
1370
1371 let bls_sig_op_3 = test_operator_3
1372 .bls_keypair
1373 .sign_message(task_response_digest.as_ref());
1374 handle
1375 .process_signature(TaskSignature::new(
1376 task_index,
1377 task_response_digest,
1378 bls_sig_op_3.clone(),
1379 test_operator_3.operator_id,
1380 ))
1381 .await
1382 .unwrap();
1383
1384 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1385 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1386 let signers_agg_sig_g1 =
1387 aggregate_g1_signatures(&vec![bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
1388
1389 let expected_agg_service_response = BlsAggregationServiceResponse {
1390 task_index,
1391 task_response_digest,
1392 non_signers_pub_keys_g1: vec![],
1393 quorum_apks_g1: vec![quorum_apks_g1],
1394 signers_apk_g2,
1395 signers_agg_sig_g1,
1396 non_signer_quorum_bitmap_indices: vec![],
1397 quorum_apk_indices: vec![],
1398 total_stake_indices: vec![],
1399 non_signer_stake_indices: vec![],
1400 };
1401
1402 let response = aggregator_response.receive_aggregated_response().await;
1403 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1404 assert_eq!(task_index, response.unwrap().task_index);
1405 }
1406
1407 #[tokio::test]
1408 async fn test_2_quorum_2_operator_2_correct_signatures() {
1409 let test_operator_1 = TestOperator {
1410 operator_id: U256::from(1).into(),
1411 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1412 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1413 };
1414 let test_operator_2 = TestOperator {
1415 operator_id: U256::from(2).into(),
1416 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1417 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1418 };
1419 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1420 let block_number = 1;
1421 let task_index = 0;
1422 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1423 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1424 let time_to_expiry = Duration::from_secs(1);
1425 let task_response = 123; let task_response_digest = hash(task_response);
1427
1428 let fake_avs_registry_service =
1429 FakeAvsRegistryService::new(block_number, test_operators.clone());
1430 let bls_agg_service =
1431 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1432
1433 let metadata = TaskMetadata::new(
1434 task_index,
1435 block_number,
1436 quorum_numbers,
1437 quorum_threshold_percentages,
1438 time_to_expiry,
1439 );
1440 let (handle, mut aggregator_response) = bls_agg_service.start();
1441 handle.initialize_task(metadata).await.unwrap();
1442
1443 let bls_sig_op_1 = test_operator_1
1444 .bls_keypair
1445 .sign_message(task_response_digest.as_ref());
1446 handle
1447 .process_signature(TaskSignature::new(
1448 task_index,
1449 task_response_digest,
1450 bls_sig_op_1.clone(),
1451 test_operator_1.operator_id,
1452 ))
1453 .await
1454 .unwrap();
1455
1456 let bls_sig_op_2 = test_operator_2
1457 .bls_keypair
1458 .sign_message(task_response_digest.as_ref());
1459 handle
1460 .process_signature(TaskSignature::new(
1461 task_index,
1462 task_response_digest,
1463 bls_sig_op_2.clone(),
1464 test_operator_2.operator_id,
1465 ))
1466 .await
1467 .unwrap();
1468
1469 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1470 let signers_apk_g2 =
1471 aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1472 let signers_agg_sig_g1 = aggregate_g1_signatures(&[
1473 bls_sig_op_1.clone(),
1474 bls_sig_op_1,
1475 bls_sig_op_2.clone(),
1476 bls_sig_op_2,
1477 ]);
1478
1479 let expected_agg_service_response = BlsAggregationServiceResponse {
1480 task_index,
1481 task_response_digest,
1482 non_signers_pub_keys_g1: vec![],
1483 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1],
1484 signers_apk_g2,
1485 signers_agg_sig_g1,
1486 non_signer_quorum_bitmap_indices: vec![],
1487 quorum_apk_indices: vec![],
1488 total_stake_indices: vec![],
1489 non_signer_stake_indices: vec![],
1490 };
1491
1492 let response = aggregator_response.receive_aggregated_response().await;
1493
1494 assert_eq!(expected_agg_service_response, response.unwrap());
1495 }
1496
1497 #[tokio::test]
1498 async fn test_2_concurrent_tasks_2_quorum_2_operator_2_correct_signatures() {
1499 let test_operator_1 = TestOperator {
1500 operator_id: U256::from(1).into(),
1501 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1502 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1503 };
1504 let test_operator_2 = TestOperator {
1505 operator_id: U256::from(2).into(),
1506 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1507 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1508 };
1509 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1510 let block_number = 1;
1511 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1512 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1513 let time_to_expiry = Duration::from_secs(1);
1514
1515 let fake_avs_registry_service =
1516 FakeAvsRegistryService::new(block_number, test_operators.clone());
1517 let bls_agg_service =
1518 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1519
1520 let task_1_index = 1;
1522 let task_1_response = 123; let task_1_response_digest = hash(task_1_response);
1524 let metadata1 = TaskMetadata::new(
1525 task_1_index,
1526 block_number,
1527 quorum_numbers.clone(),
1528 quorum_threshold_percentages.clone(),
1529 time_to_expiry,
1530 );
1531 let (handle, mut aggregator_response) = bls_agg_service.start();
1532 handle.initialize_task(metadata1).await.unwrap();
1533
1534 let task_2_index = 2;
1535 let task_2_response = 234; let task_2_response_digest = hash(task_2_response);
1537 let metadata2 = TaskMetadata::new(
1538 task_2_index,
1539 block_number,
1540 quorum_numbers,
1541 quorum_threshold_percentages,
1542 time_to_expiry,
1543 );
1544 handle.initialize_task(metadata2).await.unwrap();
1545
1546 let bls_sig_task_1_op_1 = test_operator_1
1547 .bls_keypair
1548 .sign_message(task_1_response_digest.as_ref());
1549 handle
1550 .process_signature(TaskSignature::new(
1551 task_1_index,
1552 task_1_response_digest,
1553 bls_sig_task_1_op_1.clone(),
1554 test_operator_1.operator_id,
1555 ))
1556 .await
1557 .unwrap();
1558
1559 let bls_sig_task_1_op_2 = test_operator_2
1560 .bls_keypair
1561 .sign_message(task_1_response_digest.as_ref());
1562 handle
1563 .process_signature(TaskSignature::new(
1564 task_1_index,
1565 task_1_response_digest,
1566 bls_sig_task_1_op_2.clone(),
1567 test_operator_2.operator_id,
1568 ))
1569 .await
1570 .unwrap();
1571
1572 let bls_sig_task_2_op_1 = test_operator_1
1573 .bls_keypair
1574 .sign_message(task_2_response_digest.as_ref());
1575 handle
1576 .process_signature(TaskSignature::new(
1577 task_2_index,
1578 task_2_response_digest,
1579 bls_sig_task_2_op_1.clone(),
1580 test_operator_1.operator_id,
1581 ))
1582 .await
1583 .unwrap();
1584
1585 let bls_sig_task_2_op_2 = test_operator_2
1586 .bls_keypair
1587 .sign_message(task_2_response_digest.as_ref());
1588 handle
1589 .process_signature(TaskSignature::new(
1590 task_2_index,
1591 task_2_response_digest,
1592 bls_sig_task_2_op_2.clone(),
1593 test_operator_2.operator_id,
1594 ))
1595 .await
1596 .unwrap();
1597
1598 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1599 let signers_apk_g2 =
1600 aggregate_g2_public_keys(&[test_operators.clone(), test_operators].concat());
1601 let signers_agg_sig_g1_task_1 = aggregate_g1_signatures(&[
1602 bls_sig_task_1_op_1.clone(),
1603 bls_sig_task_1_op_1,
1604 bls_sig_task_1_op_2.clone(),
1605 bls_sig_task_1_op_2,
1606 ]);
1607
1608 let expected_response_task_1 = BlsAggregationServiceResponse {
1609 task_index: task_1_index,
1610 task_response_digest: task_1_response_digest,
1611 non_signers_pub_keys_g1: vec![],
1612 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1613 signers_apk_g2: signers_apk_g2.clone(),
1614 signers_agg_sig_g1: signers_agg_sig_g1_task_1,
1615 non_signer_quorum_bitmap_indices: vec![],
1616 quorum_apk_indices: vec![],
1617 total_stake_indices: vec![],
1618 non_signer_stake_indices: vec![],
1619 };
1620
1621 let signers_agg_sig_g1_task_2 = aggregate_g1_signatures(&[
1622 bls_sig_task_2_op_1.clone(),
1623 bls_sig_task_2_op_1,
1624 bls_sig_task_2_op_2.clone(),
1625 bls_sig_task_2_op_2,
1626 ]);
1627
1628 let expected_response_task_2 = BlsAggregationServiceResponse {
1629 task_index: task_2_index,
1630 task_response_digest: task_2_response_digest,
1631 non_signers_pub_keys_g1: vec![],
1632 quorum_apks_g1: vec![quorum_apks_g1.clone(), quorum_apks_g1.clone()],
1633 signers_apk_g2,
1634 signers_agg_sig_g1: signers_agg_sig_g1_task_2,
1635 non_signer_quorum_bitmap_indices: vec![],
1636 quorum_apk_indices: vec![],
1637 total_stake_indices: vec![],
1638 non_signer_stake_indices: vec![],
1639 };
1640
1641 let first_response = aggregator_response.receive_aggregated_response().await;
1642 let second_response = aggregator_response.receive_aggregated_response().await;
1643
1644 let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1
1645 {
1646 (first_response, second_response)
1647 } else {
1648 (second_response, first_response)
1649 };
1650
1651 assert_eq!(expected_response_task_1, task_1_response.unwrap());
1652 assert_eq!(expected_response_task_2, task_2_response.unwrap());
1653 }
1654
1655 #[tokio::test]
1656 async fn test_1_quorum_1_operator_0_signatures_task_expired() {
1657 let test_operator_1 = TestOperator {
1658 operator_id: U256::from(1).into(),
1659 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1660 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1661 };
1662
1663 let block_number = 1;
1664 let task_index: TaskIndex = 0;
1665 let quorum_numbers = vec![0];
1666 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
1667 let time_to_expiry = Duration::from_secs(1);
1668 let _task_response = 123; let fake_avs_registry_service =
1671 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
1672 let bls_agg_service =
1673 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1674 let metadata = TaskMetadata::new(
1675 task_index,
1676 block_number,
1677 quorum_numbers,
1678 quorum_threshold_percentages,
1679 time_to_expiry,
1680 );
1681 let (handle, mut aggregator_response) = bls_agg_service.start();
1682 handle.initialize_task(metadata).await.unwrap();
1683
1684 let response = aggregator_response.receive_aggregated_response().await;
1685
1686 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1687 }
1688
1689 #[tokio::test]
1690 async fn test_1_quorum_2_operator_1_signatures_50_threshold() {
1691 let test_operator_1 = TestOperator {
1692 operator_id: U256::from(1).into(),
1693 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1694 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1695 };
1696 let test_operator_2 = TestOperator {
1697 operator_id: U256::from(2).into(),
1698 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1699 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1700 };
1701 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1702 let block_number = 1;
1703 let task_index = 0;
1704 let quorum_numbers: Vec<QuorumNum> = vec![0];
1705 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8];
1706 let time_to_expiry = Duration::from_secs(1);
1707 let task_response = 123; let task_response_digest = hash(task_response);
1709 let bls_sig_op_1 = test_operator_1
1710 .bls_keypair
1711 .sign_message(task_response_digest.as_ref());
1712
1713 let fake_avs_registry_service =
1714 FakeAvsRegistryService::new(block_number, test_operators.clone());
1715 let bls_agg_service =
1716 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1717
1718 let metadata = TaskMetadata::new(
1719 task_index,
1720 block_number,
1721 quorum_numbers,
1722 quorum_threshold_percentages,
1723 time_to_expiry,
1724 );
1725 let (handle, mut aggregator_response) = bls_agg_service.start();
1726 handle.initialize_task(metadata).await.unwrap();
1727
1728 handle
1729 .process_signature(TaskSignature::new(
1730 task_index,
1731 task_response_digest,
1732 bls_sig_op_1.clone(),
1733 test_operator_1.operator_id,
1734 ))
1735 .await
1736 .unwrap();
1737
1738 let quorum_apks_g1 = aggregate_g1_public_keys(&test_operators);
1739
1740 let signers_apk_g2: BlsG2Point = test_operator_1.bls_keypair.public_key_g2();
1741
1742 let expected_agg_service_response = BlsAggregationServiceResponse {
1743 task_index,
1744 task_response_digest,
1745 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], quorum_apks_g1: vec![quorum_apks_g1],
1747 signers_apk_g2,
1748 signers_agg_sig_g1: bls_sig_op_1,
1749 non_signer_quorum_bitmap_indices: vec![],
1750 quorum_apk_indices: vec![],
1751 total_stake_indices: vec![],
1752 non_signer_stake_indices: vec![],
1753 };
1754
1755 let response = aggregator_response.receive_aggregated_response().await;
1756
1757 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1758 assert_eq!(task_index, response.unwrap().task_index);
1759 }
1760
1761 #[tokio::test]
1762 async fn test_1_quorum_2_operator_1_signatures_60_threshold() {
1763 let test_operator_1 = TestOperator {
1764 operator_id: U256::from(1).into(),
1765 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1766 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1767 };
1768 let test_operator_2 = TestOperator {
1769 operator_id: U256::from(2).into(),
1770 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1771 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1772 };
1773 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1774 let block_number = 1;
1775 let task_index = 0;
1776 let quorum_numbers: Vec<QuorumNum> = vec![0];
1777 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8];
1778 let time_to_expiry = Duration::from_secs(1);
1779 let task_response = 123; let task_response_digest = hash(task_response);
1781 let bls_sig_op_1 = test_operator_1
1782 .bls_keypair
1783 .sign_message(task_response_digest.as_ref());
1784
1785 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
1786 let bls_agg_service =
1787 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1788
1789 let metadata = TaskMetadata::new(
1790 task_index,
1791 block_number,
1792 quorum_numbers,
1793 quorum_threshold_percentages,
1794 time_to_expiry,
1795 );
1796 let (handle, mut aggregator_response) = bls_agg_service.start();
1797 handle.initialize_task(metadata).await.unwrap();
1798
1799 handle
1800 .process_signature(TaskSignature::new(
1801 task_index,
1802 task_response_digest,
1803 bls_sig_op_1,
1804 test_operator_1.operator_id,
1805 ))
1806 .await
1807 .unwrap();
1808
1809 let response = aggregator_response.receive_aggregated_response().await;
1810
1811 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
1812 }
1813
1814 #[tokio::test]
1815 async fn test_2_quorums_2_operators_which_just_take_1_quorum_2_correct_signatures() {
1816 let test_operator_1 = TestOperator {
1817 operator_id: U256::from(1).into(),
1818 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1820 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1821 };
1822 let test_operator_2 = TestOperator {
1823 operator_id: U256::from(2).into(),
1824 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1826 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1827 };
1828
1829 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
1830 let block_number = 1;
1831 let task_index = 0;
1832 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1833 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8, 100u8];
1834 let time_to_expiry = Duration::from_secs(1);
1835 let task_response = 123; let task_response_digest = hash(task_response);
1837
1838 let fake_avs_registry_service =
1839 FakeAvsRegistryService::new(block_number, test_operators.clone());
1840 let bls_agg_service =
1841 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1842
1843 let metadata = TaskMetadata::new(
1844 task_index,
1845 block_number,
1846 quorum_numbers,
1847 quorum_threshold_percentages,
1848 time_to_expiry,
1849 );
1850 let (handle, mut aggregator_response) = bls_agg_service.start();
1851 handle.initialize_task(metadata).await.unwrap();
1852
1853 let bls_sig_op_1 = test_operator_1
1854 .bls_keypair
1855 .sign_message(task_response_digest.as_ref());
1856 handle
1857 .process_signature(TaskSignature::new(
1858 task_index,
1859 task_response_digest,
1860 bls_sig_op_1.clone(),
1861 test_operator_1.operator_id,
1862 ))
1863 .await
1864 .unwrap();
1865
1866 let bls_sig_op_2 = test_operator_2
1867 .bls_keypair
1868 .sign_message(task_response_digest.as_ref());
1869 handle
1870 .process_signature(TaskSignature::new(
1871 task_index,
1872 task_response_digest,
1873 bls_sig_op_2.clone(),
1874 test_operator_2.operator_id,
1875 ))
1876 .await
1877 .unwrap();
1878
1879 let signers_apk_g2 = aggregate_g2_public_keys(&test_operators);
1880 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1881
1882 let expected_agg_service_response = BlsAggregationServiceResponse {
1883 task_index,
1884 task_response_digest,
1885 non_signers_pub_keys_g1: vec![],
1886 quorum_apks_g1: vec![
1887 test_operator_1.bls_keypair.public_key(),
1888 test_operator_2.bls_keypair.public_key(),
1889 ],
1890 signers_apk_g2,
1891 signers_agg_sig_g1,
1892 non_signer_quorum_bitmap_indices: vec![],
1893 quorum_apk_indices: vec![],
1894 total_stake_indices: vec![],
1895 non_signer_stake_indices: vec![],
1896 };
1897
1898 let response = aggregator_response.receive_aggregated_response().await;
1899
1900 assert_eq!(expected_agg_service_response, response.clone().unwrap());
1901 assert_eq!(task_index, response.unwrap().task_index);
1902 }
1903
1904 #[tokio::test]
1905 async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() {
1906 let test_operator_1 = TestOperator {
1907 operator_id: U256::from(1).into(),
1908 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
1910 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
1911 };
1912 let test_operator_2 = TestOperator {
1913 operator_id: U256::from(2).into(),
1914 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
1916 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
1917 };
1918
1919 let test_operator_3 = TestOperator {
1920 operator_id: U256::from(3).into(),
1921 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
1923 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
1924 };
1925
1926 let test_operators = vec![
1927 test_operator_1.clone(),
1928 test_operator_2.clone(),
1929 test_operator_3.clone(),
1930 ];
1931 let block_number = 1;
1932 let task_index = 0;
1933 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
1934 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50u8, 50u8];
1935 let time_to_expiry = Duration::from_secs(1);
1936 let task_response = 123; let task_response_digest = hash(task_response);
1938
1939 let fake_avs_registry_service =
1940 FakeAvsRegistryService::new(block_number, test_operators.clone());
1941 let bls_agg_service =
1942 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
1943
1944 let metadata = TaskMetadata::new(
1945 task_index,
1946 block_number,
1947 quorum_numbers,
1948 quorum_threshold_percentages,
1949 time_to_expiry,
1950 );
1951 let (handle, mut aggregator_response) = bls_agg_service.start();
1952 handle.initialize_task(metadata).await.unwrap();
1953
1954 let bls_sig_op_1 = test_operator_1
1955 .bls_keypair
1956 .sign_message(task_response_digest.as_ref());
1957 handle
1958 .process_signature(TaskSignature::new(
1959 task_index,
1960 task_response_digest,
1961 bls_sig_op_1.clone(),
1962 test_operator_1.operator_id,
1963 ))
1964 .await
1965 .unwrap();
1966
1967 let bls_sig_op_2 = test_operator_2
1968 .bls_keypair
1969 .sign_message(task_response_digest.as_ref());
1970 handle
1971 .process_signature(TaskSignature::new(
1972 task_index,
1973 task_response_digest,
1974 bls_sig_op_2.clone(),
1975 test_operator_2.operator_id,
1976 ))
1977 .await
1978 .unwrap();
1979 let signers_apk_g2 =
1980 aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
1981 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
1982 let quorum_apks_g1 = vec![
1983 aggregate_g1_public_keys(&vec![test_operator_1, test_operator_3.clone()]),
1984 aggregate_g1_public_keys(&vec![test_operator_2, test_operator_3.clone()]),
1985 ];
1986
1987 let expected_agg_service_response = BlsAggregationServiceResponse {
1988 task_index,
1989 task_response_digest,
1990 non_signers_pub_keys_g1: vec![test_operator_3.bls_keypair.public_key()],
1991 quorum_apks_g1,
1992 signers_apk_g2,
1993 signers_agg_sig_g1,
1994 non_signer_quorum_bitmap_indices: vec![],
1995 quorum_apk_indices: vec![],
1996 total_stake_indices: vec![],
1997 non_signer_stake_indices: vec![],
1998 };
1999
2000 let response = aggregator_response.receive_aggregated_response().await;
2001
2002 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2003 assert_eq!(task_index, response.unwrap().task_index);
2004 }
2005
2006 #[tokio::test]
2007 async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() {
2008 let test_operator_1 = TestOperator {
2010 operator_id: U256::from(1).into(),
2011 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2013 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2014 };
2015 let test_operator_2 = TestOperator {
2016 operator_id: U256::from(2).into(),
2017 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2019 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2020 };
2021
2022 let test_operator_3 = TestOperator {
2023 operator_id: U256::from(3).into(),
2024 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2026 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2027 };
2028
2029 let test_operators = vec![
2030 test_operator_1.clone(),
2031 test_operator_2.clone(),
2032 test_operator_3.clone(),
2033 ];
2034 let block_number = 1;
2035 let task_index = 0;
2036 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2037 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8];
2038 let time_to_expiry = Duration::from_secs(1);
2039 let task_response = 123; let task_response_digest = hash(task_response);
2041
2042 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2043 let bls_agg_service =
2044 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2045
2046 let metadata = TaskMetadata::new(
2047 task_index,
2048 block_number,
2049 quorum_numbers,
2050 quorum_threshold_percentages,
2051 time_to_expiry,
2052 );
2053 let (handle, mut aggregator_response) = bls_agg_service.start();
2054 handle.initialize_task(metadata).await.unwrap();
2055
2056 let bls_sig_op_1 = test_operator_1
2057 .bls_keypair
2058 .sign_message(task_response_digest.as_ref());
2059 handle
2060 .process_signature(TaskSignature::new(
2061 task_index,
2062 task_response_digest,
2063 bls_sig_op_1,
2064 test_operator_1.operator_id,
2065 ))
2066 .await
2067 .unwrap();
2068
2069 let bls_sig_op_2 = test_operator_2
2070 .bls_keypair
2071 .sign_message(task_response_digest.as_ref());
2072
2073 handle
2074 .process_signature(TaskSignature::new(
2075 task_index,
2076 task_response_digest,
2077 bls_sig_op_2,
2078 test_operator_2.operator_id,
2079 ))
2080 .await
2081 .unwrap();
2082
2083 let response = aggregator_response.receive_aggregated_response().await;
2084
2085 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2086 }
2087
2088 #[tokio::test]
2089 async fn test_2_quorums_1_operator_which_just_take_1_quorum_1_signature_task_expired() {
2090 let test_operator_1 = TestOperator {
2091 operator_id: U256::from(1).into(),
2092 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2094 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2095 };
2096
2097 let block_number = 1;
2098 let task_index = 0;
2099 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2100 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2101 let time_to_expiry = Duration::from_secs(1);
2102 let task_response = 123; let task_response_digest = hash(task_response);
2104
2105 let fake_avs_registry_service =
2106 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2107 let bls_agg_service =
2108 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2109
2110 let metadata = TaskMetadata::new(
2111 task_index,
2112 block_number,
2113 quorum_numbers,
2114 quorum_threshold_percentages,
2115 time_to_expiry,
2116 );
2117 let (handle, mut aggregator_response) = bls_agg_service.start();
2118 handle.initialize_task(metadata).await.unwrap();
2119
2120 let bls_sig_op_1 = test_operator_1
2121 .bls_keypair
2122 .sign_message(task_response_digest.as_ref());
2123
2124 handle
2125 .process_signature(TaskSignature::new(
2126 task_index,
2127 task_response_digest,
2128 bls_sig_op_1,
2129 test_operator_1.operator_id,
2130 ))
2131 .await
2132 .unwrap();
2133
2134 let response = aggregator_response.receive_aggregated_response().await;
2135
2136 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2137 }
2138
2139 #[tokio::test]
2140 async fn test_2_quorums_2_operators_where_1_operator_just_take_1_quorum_1_signature_task_expired(
2141 ) {
2142 let test_operator_1 = TestOperator {
2143 operator_id: U256::from(1).into(),
2144 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2146 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2147 };
2148 let test_operator_2 = TestOperator {
2149 operator_id: U256::from(2).into(),
2150 stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
2152 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2153 };
2154
2155 let block_number = 1;
2156 let task_index = 0;
2157 let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
2158 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100, 100];
2159 let time_to_expiry = Duration::from_secs(1);
2160 let task_response = 123; let task_response_digest = hash(task_response);
2162 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2163
2164 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2165 let bls_agg_service =
2166 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2167
2168 let metadata = TaskMetadata::new(
2169 task_index,
2170 block_number,
2171 quorum_numbers,
2172 quorum_threshold_percentages,
2173 time_to_expiry,
2174 );
2175 let (handle, mut aggregator_response) = bls_agg_service.start();
2176 handle.initialize_task(metadata).await.unwrap();
2177
2178 let bls_sig_op_1 = test_operator_1
2179 .bls_keypair
2180 .sign_message(task_response_digest.as_ref());
2181 handle
2182 .process_signature(TaskSignature::new(
2183 task_index,
2184 task_response_digest,
2185 bls_sig_op_1,
2186 test_operator_1.operator_id,
2187 ))
2188 .await
2189 .unwrap();
2190
2191 let response = aggregator_response.receive_aggregated_response().await;
2192
2193 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2194 }
2195
2196 #[tokio::test]
2197 async fn send_signature_of_task_not_initialized() {
2198 let test_operator_1 = TestOperator {
2199 operator_id: U256::from(1).into(),
2200 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2202 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2203 };
2204
2205 let block_number = 1;
2206 let task_index = 0;
2207 let task_response = 123; let task_response_digest = hash(task_response);
2209
2210 let fake_avs_registry_service =
2211 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2212 let bls_agg_service =
2213 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2214
2215 let bls_sig_op_1 = test_operator_1
2216 .bls_keypair
2217 .sign_message(task_response_digest.as_ref());
2218 let (handle, _) = bls_agg_service.start();
2219 let result = handle
2220 .process_signature(TaskSignature::new(
2221 task_index,
2222 task_response_digest,
2223 bls_sig_op_1,
2224 test_operator_1.operator_id,
2225 ))
2226 .await;
2227
2228 assert_eq!(Err(BlsAggregationServiceError::TaskNotFound), result);
2229 }
2230
2231 #[tokio::test]
2232 async fn test_1_quorum_2_operator_2_signatures_on_2_different_msgs() {
2233 let test_operator_1 = TestOperator {
2234 operator_id: U256::from(1).into(),
2235 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2236 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2237 };
2238 let test_operator_2 = TestOperator {
2239 operator_id: U256::from(2).into(),
2240 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2241 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2242 };
2243 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2244 let block_number = 1;
2245 let task_index = 0;
2246 let quorum_numbers: Vec<QuorumNum> = vec![0];
2247 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100u8];
2248 let time_to_expiry = Duration::from_secs(1);
2249 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2250 let bls_agg_service =
2251 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2252
2253 let metadata = TaskMetadata::new(
2254 task_index,
2255 block_number,
2256 quorum_numbers,
2257 quorum_threshold_percentages,
2258 time_to_expiry,
2259 );
2260 let (handle, mut aggregator_response) = bls_agg_service.start();
2261 handle.initialize_task(metadata).await.unwrap();
2262
2263 let task_response_1 = 123; let task_response_1_digest = hash(task_response_1);
2265 let bls_sig_op_1 = test_operator_1
2266 .bls_keypair
2267 .sign_message(task_response_1_digest.as_ref());
2268 handle
2269 .process_signature(TaskSignature::new(
2270 task_index,
2271 task_response_1_digest,
2272 bls_sig_op_1,
2273 test_operator_1.operator_id,
2274 ))
2275 .await
2276 .unwrap();
2277
2278 let task_response_2 = 456; let task_response_2_digest = hash(task_response_2);
2280 let bls_sig_op_2 = test_operator_1
2281 .bls_keypair
2282 .sign_message(task_response_2_digest.as_ref());
2283 handle
2284 .process_signature(TaskSignature::new(
2285 task_index,
2286 task_response_2_digest,
2287 bls_sig_op_2,
2288 test_operator_1.operator_id,
2289 ))
2290 .await
2291 .unwrap();
2292
2293 let response = aggregator_response.receive_aggregated_response().await;
2294
2295 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2296 }
2297
2298 #[tokio::test]
2299 async fn test_1_quorum_1_operator_1_invalid_signature() {
2300 let test_operator_1 = TestOperator {
2301 operator_id: U256::from(1).into(),
2302 stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
2303 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2304 };
2305
2306 let block_number = 1;
2307 let task_index = 0;
2308 let quorum_numbers = vec![0];
2309 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100];
2310 let time_to_expiry = Duration::from_secs(1);
2311 let task_response = 123; let wrong_task_response_digest = hash(task_response + 1);
2314 let bls_signature = test_operator_1
2315 .bls_keypair
2316 .sign_message(hash(task_response).as_ref());
2317 let fake_avs_registry_service =
2318 FakeAvsRegistryService::new(block_number, vec![test_operator_1.clone()]);
2319 let bls_agg_service =
2320 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2321 let metadata = TaskMetadata::new(
2322 task_index,
2323 block_number,
2324 quorum_numbers,
2325 quorum_threshold_percentages,
2326 time_to_expiry,
2327 );
2328 let (handle, mut aggregator_response) = bls_agg_service.start();
2329 handle.initialize_task(metadata).await.unwrap();
2330
2331 let result = handle
2332 .process_signature(TaskSignature::new(
2333 task_index,
2334 wrong_task_response_digest,
2335 bls_signature.clone(),
2336 test_operator_1.operator_id,
2337 ))
2338 .await;
2339
2340 assert_eq!(
2341 Err(BlsAggregationServiceError::SignatureVerificationError(
2342 IncorrectSignature
2343 )),
2344 result
2345 );
2346
2347 let response = aggregator_response.receive_aggregated_response().await;
2349
2350 assert_eq!(Err(BlsAggregationServiceError::TaskExpired), response);
2351 }
2352
2353 #[tokio::test]
2354 async fn test_signatures_are_processed_during_window_after_quorum() {
2355 let test_operator_1 = TestOperator {
2356 operator_id: U256::from(1).into(),
2357 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2358 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2359 };
2360 let test_operator_2 = TestOperator {
2361 operator_id: U256::from(2).into(),
2362 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2363 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2364 };
2365 let test_operator_3 = TestOperator {
2366 operator_id: U256::from(3).into(),
2367 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2368 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(),
2369 };
2370 let test_operators = vec![
2371 test_operator_1.clone(),
2372 test_operator_2.clone(),
2373 test_operator_3.clone(),
2374 ];
2375 let block_number = 1;
2376 let task_index = 0;
2377 let task_response = 123;
2378 let quorum_numbers: Vec<QuorumNum> = vec![0];
2379 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50_u8];
2380 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2381 let bls_agg_service =
2382 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2383
2384 let time_to_expiry = Duration::from_secs(5);
2385 let window_duration = Duration::from_secs(1);
2386
2387 let start = Instant::now();
2388 let metadata = TaskMetadata::new(
2389 task_index,
2390 block_number,
2391 quorum_numbers,
2392 quorum_threshold_percentages,
2393 time_to_expiry,
2394 )
2395 .with_window_duration(window_duration);
2396
2397 let (handle, mut aggregator_response) = bls_agg_service.start();
2398 handle.initialize_task(metadata).await.unwrap();
2399
2400 let task_response_1_digest = hash(task_response);
2401 let bls_sig_op_1 = test_operator_1
2402 .bls_keypair
2403 .sign_message(task_response_1_digest.as_ref());
2404 handle
2405 .process_signature(TaskSignature::new(
2406 task_index,
2407 task_response_1_digest,
2408 bls_sig_op_1.clone(),
2409 test_operator_1.operator_id,
2410 ))
2411 .await
2412 .unwrap();
2413
2414 let task_response_2_digest = hash(task_response);
2415 let bls_sig_op_2 = test_operator_2
2416 .bls_keypair
2417 .sign_message(task_response_2_digest.as_ref());
2418 handle
2419 .process_signature(TaskSignature::new(
2420 task_index,
2421 task_response_2_digest,
2422 bls_sig_op_2.clone(),
2423 test_operator_2.operator_id,
2424 ))
2425 .await
2426 .unwrap();
2427
2428 sleep(Duration::from_millis(500)).await;
2430 let task_response_3_digest = hash(task_response);
2431 let bls_sig_op_3 = test_operator_3
2432 .bls_keypair
2433 .sign_message(task_response_3_digest.as_ref());
2434 handle
2435 .process_signature(TaskSignature::new(
2436 task_index,
2437 task_response_3_digest,
2438 bls_sig_op_3.clone(),
2439 test_operator_3.operator_id,
2440 ))
2441 .await
2442 .unwrap();
2443
2444 let signers_apk_g2 = aggregate_g2_public_keys(&vec![
2445 test_operator_1.clone(),
2446 test_operator_2.clone(),
2447 test_operator_3.clone(),
2448 ]);
2449 let signers_agg_sig_g1 =
2450 aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]);
2451 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2452 test_operator_1,
2453 test_operator_2,
2454 test_operator_3,
2455 ])];
2456
2457 let expected_agg_service_response = BlsAggregationServiceResponse {
2458 task_index,
2459 task_response_digest: task_response_3_digest,
2460 non_signers_pub_keys_g1: vec![],
2461 quorum_apks_g1,
2462 signers_apk_g2,
2463 signers_agg_sig_g1,
2464 non_signer_quorum_bitmap_indices: vec![],
2465 quorum_apk_indices: vec![],
2466 total_stake_indices: vec![],
2467 non_signer_stake_indices: vec![],
2468 };
2469
2470 let response = aggregator_response.receive_aggregated_response().await;
2471
2472 let elapsed = start.elapsed();
2473 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2474 assert_eq!(task_index, response.unwrap().task_index);
2475 assert!(elapsed < time_to_expiry);
2476 assert!(elapsed >= window_duration);
2477 }
2478
2479 #[tokio::test]
2480 async fn test_if_quorum_has_been_reached_and_the_task_expires_during_window_the_response_is_sent(
2481 ) {
2482 let test_operator_1 = TestOperator {
2483 operator_id: U256::from(1).into(),
2484 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2485 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2486 };
2487 let test_operator_2 = TestOperator {
2488 operator_id: U256::from(2).into(),
2489 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2490 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2491 };
2492 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2493 let block_number = 1;
2494 let task_index = 0;
2495 let task_response = 123;
2496 let quorum_numbers: Vec<QuorumNum> = vec![0];
2497 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2498 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2499 let bls_agg_service =
2500 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2501
2502 let time_to_expiry = Duration::from_secs(2);
2503 let window_duration = Duration::from_secs(10);
2504
2505 let start = Instant::now();
2506 let metadata = TaskMetadata::new(
2507 task_index,
2508 block_number,
2509 quorum_numbers,
2510 quorum_threshold_percentages,
2511 time_to_expiry,
2512 )
2513 .with_window_duration(window_duration);
2514 let (handle, mut aggregator_response) = bls_agg_service.start();
2515 handle.initialize_task(metadata).await.unwrap();
2516
2517 let task_response_1_digest = hash(task_response);
2518 let bls_sig_op_1 = test_operator_1
2519 .bls_keypair
2520 .sign_message(task_response_1_digest.as_ref());
2521 handle
2522 .process_signature(TaskSignature::new(
2523 task_index,
2524 task_response_1_digest,
2525 bls_sig_op_1.clone(),
2526 test_operator_1.operator_id,
2527 ))
2528 .await
2529 .unwrap();
2530
2531 let task_response_2_digest = hash(task_response);
2534 let bls_sig_op_2 = test_operator_2
2535 .bls_keypair
2536 .sign_message(task_response_2_digest.as_ref());
2537 handle
2538 .process_signature(TaskSignature::new(
2539 task_index,
2540 task_response_2_digest,
2541 bls_sig_op_2.clone(),
2542 test_operator_2.operator_id,
2543 ))
2544 .await
2545 .unwrap();
2546
2547 let signers_apk_g2 =
2548 aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]);
2549 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]);
2550 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2551 test_operator_1,
2552 test_operator_2,
2553 ])];
2554
2555 let expected_agg_service_response = BlsAggregationServiceResponse {
2556 task_index,
2557 task_response_digest: task_response_2_digest,
2558 non_signers_pub_keys_g1: vec![],
2559 quorum_apks_g1,
2560 signers_apk_g2,
2561 signers_agg_sig_g1,
2562 non_signer_quorum_bitmap_indices: vec![],
2563 quorum_apk_indices: vec![],
2564 total_stake_indices: vec![],
2565 non_signer_stake_indices: vec![],
2566 };
2567
2568 let response = aggregator_response.receive_aggregated_response().await;
2569
2570 let elapsed = start.elapsed();
2571 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2572 assert_eq!(task_index, response.unwrap().task_index);
2573 assert!(elapsed >= time_to_expiry);
2574 assert!(elapsed < window_duration);
2575 }
2576
2577 #[tokio::test]
2578 async fn test_if_window_duration_is_zero_no_signatures_are_aggregated_after_reaching_quorum() {
2579 let test_operator_1 = TestOperator {
2580 operator_id: U256::from(1).into(),
2581 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2582 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2583 };
2584 let test_operator_2 = TestOperator {
2585 operator_id: U256::from(2).into(),
2586 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2587 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2588 };
2589 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2590 let block_number = 1;
2591 let task_index = 0;
2592 let task_response = 123;
2593 let quorum_numbers: Vec<QuorumNum> = vec![0];
2594 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2595 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2596 let bls_agg_service =
2597 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2598
2599 let time_to_expiry = Duration::from_secs(2);
2600 let window_duration = Duration::ZERO;
2601
2602 let start = Instant::now();
2603 let metadata = TaskMetadata::new(
2604 task_index,
2605 block_number,
2606 quorum_numbers,
2607 quorum_threshold_percentages,
2608 time_to_expiry,
2609 )
2610 .with_window_duration(window_duration);
2611 let (handle, mut aggregator_response) = bls_agg_service.start();
2612 handle.initialize_task(metadata).await.unwrap();
2613
2614 let task_response_1_digest = hash(task_response);
2615 let bls_sig_op_1 = test_operator_1
2616 .bls_keypair
2617 .sign_message(task_response_1_digest.as_ref());
2618 handle
2619 .process_signature(TaskSignature::new(
2620 task_index,
2621 task_response_1_digest,
2622 bls_sig_op_1.clone(),
2623 test_operator_1.operator_id,
2624 ))
2625 .await
2626 .unwrap();
2627
2628 sleep(Duration::from_millis(1)).await;
2630
2631 let task_response_2_digest = hash(task_response);
2632 let bls_sig_op_2 = test_operator_2
2633 .bls_keypair
2634 .sign_message(task_response_2_digest.as_ref());
2635 let process_signature_result = handle
2636 .process_signature(TaskSignature::new(
2637 task_index,
2638 task_response_2_digest,
2639 bls_sig_op_2,
2640 test_operator_2.operator_id,
2641 ))
2642 .await;
2643 assert_eq!(
2644 Err(BlsAggregationServiceError::SenderError),
2645 process_signature_result
2646 );
2647
2648 let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2649 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2650 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2651 test_operator_1,
2652 test_operator_2.clone(),
2653 ])];
2654
2655 let expected_agg_service_response = BlsAggregationServiceResponse {
2656 task_index,
2657 task_response_digest: task_response_1_digest,
2658 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2659 quorum_apks_g1,
2660 signers_apk_g2,
2661 signers_agg_sig_g1,
2662 non_signer_quorum_bitmap_indices: vec![],
2663 quorum_apk_indices: vec![],
2664 total_stake_indices: vec![],
2665 non_signer_stake_indices: vec![],
2666 };
2667
2668 let response = aggregator_response.receive_aggregated_response().await;
2669
2670 let elapsed = start.elapsed();
2671 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2672 assert_eq!(task_index, response.unwrap().task_index);
2673 assert!(elapsed < time_to_expiry);
2674 }
2675
2676 #[tokio::test]
2677 async fn test_no_signatures_are_aggregated_after_window() {
2678 let test_operator_1 = TestOperator {
2679 operator_id: U256::from(1).into(),
2680 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2681 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(),
2682 };
2683 let test_operator_2 = TestOperator {
2684 operator_id: U256::from(2).into(),
2685 stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
2686 bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(),
2687 };
2688 let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()];
2689 let block_number = 1;
2690 let task_index = 0;
2691 let task_response = 123;
2692 let quorum_numbers: Vec<QuorumNum> = vec![0];
2693 let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8];
2694 let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
2695 let bls_agg_service =
2696 BlsAggregatorService::new(fake_avs_registry_service, get_test_logger());
2697
2698 let time_to_expiry = Duration::from_secs(5);
2699 let window_duration = Duration::from_secs(1);
2700
2701 let start = Instant::now();
2702 let metadata = TaskMetadata::new(
2703 task_index,
2704 block_number,
2705 quorum_numbers,
2706 quorum_threshold_percentages,
2707 time_to_expiry,
2708 )
2709 .with_window_duration(window_duration);
2710 let (handle, mut aggregator_response) = bls_agg_service.start();
2711 handle.initialize_task(metadata).await.unwrap();
2712
2713 let task_response_1_digest = hash(task_response);
2714 let bls_sig_op_1 = test_operator_1
2715 .bls_keypair
2716 .sign_message(task_response_1_digest.as_ref());
2717 handle
2718 .process_signature(TaskSignature::new(
2719 task_index,
2720 task_response_1_digest,
2721 bls_sig_op_1.clone(),
2722 test_operator_1.operator_id,
2723 ))
2724 .await
2725 .unwrap();
2726
2727 sleep(Duration::from_secs(2)).await;
2729
2730 let task_response_2_digest = hash(task_response);
2731 let bls_sig_op_2 = test_operator_2
2732 .bls_keypair
2733 .sign_message(task_response_2_digest.as_ref());
2734 let process_signature_result = handle
2735 .process_signature(TaskSignature::new(
2736 task_index,
2737 task_response_2_digest,
2738 bls_sig_op_2,
2739 test_operator_2.operator_id,
2740 ))
2741 .await;
2742 assert_eq!(
2743 Err(BlsAggregationServiceError::SenderError),
2744 process_signature_result
2745 );
2746
2747 let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]);
2748 let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]);
2749 let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![
2750 test_operator_1,
2751 test_operator_2.clone(),
2752 ])];
2753
2754 let expected_agg_service_response = BlsAggregationServiceResponse {
2755 task_index,
2756 task_response_digest: task_response_1_digest,
2757 non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()],
2758 quorum_apks_g1,
2759 signers_apk_g2,
2760 signers_agg_sig_g1,
2761 non_signer_quorum_bitmap_indices: vec![],
2762 quorum_apk_indices: vec![],
2763 total_stake_indices: vec![],
2764 non_signer_stake_indices: vec![],
2765 };
2766
2767 let response = aggregator_response.receive_aggregated_response().await;
2768
2769 let elapsed = start.elapsed();
2770 assert_eq!(expected_agg_service_response, response.clone().unwrap());
2771 assert_eq!(task_index, response.unwrap().task_index);
2772 assert!(elapsed < time_to_expiry);
2773 }
2774}