1use crate::aggregation::AggregationError;
23use crate::extract;
24use alloy_primitives::{Address, Bytes};
25use blueprint_client_tangle::{AggregationConfig, OperatorMetadata, TangleClient, ThresholdType};
26use blueprint_core::JobResult;
27use blueprint_core::error::BoxError;
28use blueprint_std::boxed::Box;
29use blueprint_std::collections::{HashMap, VecDeque};
30use blueprint_std::format;
31use blueprint_std::string::{String, ToString};
32use blueprint_std::sync::{Arc, Mutex};
33#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
34use blueprint_std::time::Duration;
35use blueprint_std::vec::Vec;
36#[cfg(feature = "aggregation")]
37use blueprint_tangle_aggregation_svc::{OperatorStake, ThresholdConfig};
38use core::pin::Pin;
39use core::task::{Context, Poll};
40use futures_util::Sink;
41
42#[derive(Debug, thiserror::Error)]
44pub enum AggregatingConsumerError {
45 #[error("Client error: {0}")]
47 Client(String),
48 #[error("Missing metadata: {0}")]
50 MissingMetadata(&'static str),
51 #[error("Invalid metadata: {0}")]
53 InvalidMetadata(&'static str),
54 #[error("Transaction error: {0}")]
56 Transaction(String),
57 #[error("Aggregation error: {0}")]
59 Aggregation(#[from] AggregationError),
60 #[cfg(feature = "aggregation")]
62 #[error("Aggregation service error: {0}")]
63 AggregationService(#[from] blueprint_tangle_aggregation_svc::ClientError),
64 #[cfg(feature = "aggregation")]
66 #[error("BLS error: {0}")]
67 Bls(String),
68 #[error("Aggregation required but not configured. Call with_aggregation_service() first.")]
70 AggregationNotConfigured,
71}
72
73struct PendingJobResult {
75 service_id: u64,
76 call_id: u64,
77 job_index: u8,
78 output: Bytes,
79}
80
81enum State {
82 WaitingForResult,
83 ProcessingSubmission(
84 Pin<Box<dyn core::future::Future<Output = Result<(), AggregatingConsumerError>> + Send>>,
85 ),
86}
87
88impl State {
89 fn is_waiting(&self) -> bool {
90 matches!(self, State::WaitingForResult)
91 }
92}
93
94#[cfg(feature = "aggregation")]
96#[derive(Clone)]
97pub struct AggregationServiceConfig {
98 pub clients: Vec<blueprint_tangle_aggregation_svc::AggregationServiceClient>,
100 pub bls_secret: Arc<blueprint_crypto_bn254::ArkBlsBn254Secret>,
102 pub bls_public: Arc<blueprint_crypto_bn254::ArkBlsBn254Public>,
104 pub operator_index: u32,
106 pub wait_for_threshold: bool,
108 pub threshold_timeout: Duration,
110 pub poll_interval: Duration,
112 pub submit_to_chain: bool,
115}
116
117#[cfg(feature = "aggregation")]
118impl AggregationServiceConfig {
119 pub fn new(
121 service_url: impl Into<String>,
122 bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
123 operator_index: u32,
124 ) -> Self {
125 use blueprint_crypto_bn254::ArkBlsBn254;
126 use blueprint_crypto_core::KeyType;
127
128 let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
129 Self {
130 clients: vec![
131 blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url),
132 ],
133 bls_secret: Arc::new(bls_secret),
134 bls_public: Arc::new(bls_public),
135 operator_index,
136 wait_for_threshold: false,
137 threshold_timeout: Duration::from_secs(60),
138 poll_interval: Duration::from_secs(1),
139 submit_to_chain: true, }
141 }
142
143 pub fn with_multiple_services(
149 service_urls: impl IntoIterator<Item = impl Into<String>>,
150 bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
151 operator_index: u32,
152 ) -> Self {
153 use blueprint_crypto_bn254::ArkBlsBn254;
154 use blueprint_crypto_core::KeyType;
155
156 let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
157 let clients = service_urls
158 .into_iter()
159 .map(|url| blueprint_tangle_aggregation_svc::AggregationServiceClient::new(url))
160 .collect();
161
162 Self {
163 clients,
164 bls_secret: Arc::new(bls_secret),
165 bls_public: Arc::new(bls_public),
166 operator_index,
167 wait_for_threshold: false,
168 threshold_timeout: Duration::from_secs(60),
169 poll_interval: Duration::from_secs(1),
170 submit_to_chain: true,
171 }
172 }
173
174 pub fn add_service(mut self, service_url: impl Into<String>) -> Self {
176 self.clients
177 .push(blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url));
178 self
179 }
180
181 pub fn with_wait_for_threshold(mut self, wait: bool) -> Self {
183 self.wait_for_threshold = wait;
184 self
185 }
186
187 pub fn with_threshold_timeout(mut self, timeout: Duration) -> Self {
189 self.threshold_timeout = timeout;
190 self
191 }
192
193 pub fn with_submit_to_chain(mut self, submit: bool) -> Self {
199 self.submit_to_chain = submit;
200 self
201 }
202
203 pub fn client(&self) -> &blueprint_tangle_aggregation_svc::AggregationServiceClient {
205 self.clients
206 .first()
207 .expect("At least one client must be configured")
208 }
209
210 pub async fn discover_operator_services(
239 client: &TangleClient,
240 blueprint_id: u64,
241 service_id: u64,
242 aggregation_path: &str,
243 ) -> Result<Vec<String>, AggregatingConsumerError> {
244 let operators = client
245 .get_service_operators(service_id)
246 .await
247 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
248 let mut rpc_addresses = Vec::with_capacity(operators.len());
249 for operator in operators {
250 let metadata = client
251 .get_operator_metadata(blueprint_id, operator)
252 .await
253 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
254 if !metadata.rpc_endpoint.is_empty() {
255 rpc_addresses.push(metadata.rpc_endpoint);
256 }
257 }
258
259 let urls: Vec<String> = rpc_addresses
261 .iter()
262 .filter_map(|rpc| {
263 if rpc.is_empty() {
265 return None;
266 }
267
268 if aggregation_path.starts_with(':') {
270 if let Some(host_end) = rpc.rfind(':') {
272 let before_port = &rpc[..host_end];
274 if before_port.contains("://") {
275 return Some(format!("{}{}", before_port, aggregation_path));
276 }
277 }
278 Some(format!("{}{}", rpc, aggregation_path))
280 } else {
281 let base = rpc.trim_end_matches('/');
283 Some(format!("{}{}", base, aggregation_path))
284 }
285 })
286 .collect();
287
288 blueprint_core::info!(
289 target: "tangle-aggregating-consumer",
290 "Discovered {} operator aggregation services for blueprint {}",
291 urls.len(),
292 blueprint_id
293 );
294
295 Ok(urls)
296 }
297}
298
299pub struct AggregatingConsumer {
329 client: Arc<TangleClient>,
330 buffer: Mutex<VecDeque<PendingJobResult>>,
331 state: Mutex<State>,
332 cache: crate::cache::SharedServiceConfigCache,
334 #[cfg(feature = "aggregation")]
336 aggregation_config: Option<AggregationServiceConfig>,
337 #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
339 aggregation_strategy: Option<crate::strategy::AggregationStrategy>,
340}
341
342impl AggregatingConsumer {
343 pub fn new(client: TangleClient) -> Self {
345 Self {
346 client: Arc::new(client),
347 buffer: Mutex::new(VecDeque::new()),
348 state: Mutex::new(State::WaitingForResult),
349 cache: crate::cache::shared_cache(),
350 #[cfg(feature = "aggregation")]
351 aggregation_config: None,
352 #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
353 aggregation_strategy: None,
354 }
355 }
356
357 pub fn with_cache(client: TangleClient, cache: crate::cache::SharedServiceConfigCache) -> Self {
376 Self {
377 client: Arc::new(client),
378 buffer: Mutex::new(VecDeque::new()),
379 state: Mutex::new(State::WaitingForResult),
380 cache,
381 #[cfg(feature = "aggregation")]
382 aggregation_config: None,
383 #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
384 aggregation_strategy: None,
385 }
386 }
387
388 #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
409 pub fn with_aggregation_strategy(
410 mut self,
411 strategy: crate::strategy::AggregationStrategy,
412 ) -> Self {
413 self.aggregation_strategy = Some(strategy);
414 self
415 }
416
417 #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
419 pub fn aggregation_strategy(&self) -> Option<&crate::strategy::AggregationStrategy> {
420 self.aggregation_strategy.as_ref()
421 }
422
423 #[cfg(feature = "aggregation")]
428 pub fn with_aggregation_service(
429 mut self,
430 service_url: impl Into<String>,
431 bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
432 operator_index: u32,
433 ) -> Self {
434 self.aggregation_config = Some(AggregationServiceConfig::new(
435 service_url,
436 bls_secret,
437 operator_index,
438 ));
439 self
440 }
441
442 #[cfg(feature = "aggregation")]
444 pub fn with_aggregation_config(mut self, config: AggregationServiceConfig) -> Self {
445 self.aggregation_config = Some(config);
446 self
447 }
448
449 #[must_use]
451 pub fn client(&self) -> &TangleClient {
452 &self.client
453 }
454
455 #[must_use]
462 pub fn cache(&self) -> &crate::cache::SharedServiceConfigCache {
463 &self.cache
464 }
465
466 pub fn invalidate_service_cache(&self, service_id: u64) {
471 self.cache.invalidate_service(service_id);
472 }
473
474 async fn get_aggregation_config(
476 cache: &crate::cache::SharedServiceConfigCache,
477 client: &TangleClient,
478 service_id: u64,
479 job_index: u8,
480 ) -> Result<AggregationConfig, AggregatingConsumerError> {
481 cache
482 .get_aggregation_config(client, service_id, job_index)
483 .await
484 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
485 }
486
487 pub async fn get_operator_weights(
489 &self,
490 service_id: u64,
491 ) -> Result<crate::cache::OperatorWeights, AggregatingConsumerError> {
492 self.cache
493 .get_operator_weights(&self.client, service_id)
494 .await
495 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
496 }
497
498 pub async fn get_service_operators(
500 &self,
501 service_id: u64,
502 ) -> Result<crate::cache::ServiceOperators, AggregatingConsumerError> {
503 self.cache
504 .get_service_operators(&self.client, service_id)
505 .await
506 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
507 }
508
509 pub async fn get_service_operator_metadata(
511 &self,
512 service_id: u64,
513 ) -> Result<HashMap<Address, OperatorMetadata>, AggregatingConsumerError> {
514 self.cache
515 .get_service_operator_metadata(
516 &self.client,
517 self.client.config.settings.blueprint_id,
518 service_id,
519 )
520 .await
521 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
522 }
523}
524
525impl Sink<JobResult> for AggregatingConsumer {
526 type Error = BoxError;
527
528 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
529 Poll::Ready(Ok(()))
530 }
531
532 fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
533 let JobResult::Ok { head, body } = &item else {
534 blueprint_core::trace!(target: "tangle-aggregating-consumer", "Discarding job result with error");
535 return Ok(());
536 };
537
538 let (Some(call_id_raw), Some(service_id_raw)) = (
539 head.metadata.get(extract::CallId::METADATA_KEY),
540 head.metadata.get(extract::ServiceId::METADATA_KEY),
541 ) else {
542 blueprint_core::trace!(target: "tangle-aggregating-consumer", "Discarding job result with missing metadata");
543 return Ok(());
544 };
545
546 let job_index: u8 = head
548 .metadata
549 .get(extract::JobIndex::METADATA_KEY)
550 .and_then(|v| {
551 let val: u64 = v.try_into().ok()?;
552 u8::try_from(val).ok()
553 })
554 .unwrap_or(0);
555
556 blueprint_core::debug!(
557 target: "tangle-aggregating-consumer",
558 result = ?item,
559 job_index = job_index,
560 "Received job result, handling..."
561 );
562
563 let call_id: u64 = call_id_raw
564 .try_into()
565 .map_err(|_| AggregatingConsumerError::InvalidMetadata("call_id"))?;
566 let service_id: u64 = service_id_raw
567 .try_into()
568 .map_err(|_| AggregatingConsumerError::InvalidMetadata("service_id"))?;
569
570 self.get_mut()
571 .buffer
572 .lock()
573 .unwrap()
574 .push_back(PendingJobResult {
575 service_id,
576 call_id,
577 job_index,
578 output: Bytes::copy_from_slice(body),
579 });
580 Ok(())
581 }
582
583 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
584 let consumer = self.get_mut();
585 let mut state = consumer.state.lock().unwrap();
586
587 {
588 let buffer = consumer.buffer.lock().unwrap();
589 if buffer.is_empty() && state.is_waiting() {
590 return Poll::Ready(Ok(()));
591 }
592 }
593
594 loop {
595 match &mut *state {
596 State::WaitingForResult => {
597 let result = {
598 let mut buffer = consumer.buffer.lock().unwrap();
599 buffer.pop_front()
600 };
601
602 let Some(pending) = result else {
603 return Poll::Ready(Ok(()));
604 };
605
606 let client = Arc::clone(&consumer.client);
607 let cache = Arc::clone(&consumer.cache);
608
609 #[cfg(feature = "aggregation")]
610 let agg_config = consumer.aggregation_config.clone();
611
612 let fut = Box::pin(async move {
613 #[cfg(feature = "aggregation")]
614 {
615 submit_job_result(
616 cache,
617 client,
618 pending.service_id,
619 pending.call_id,
620 pending.job_index,
621 pending.output,
622 agg_config,
623 )
624 .await
625 }
626 #[cfg(not(feature = "aggregation"))]
627 {
628 submit_job_result(
629 cache,
630 client,
631 pending.service_id,
632 pending.call_id,
633 pending.job_index,
634 pending.output,
635 )
636 .await
637 }
638 });
639
640 *state = State::ProcessingSubmission(fut);
641 }
642 State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
643 Poll::Ready(Ok(())) => {
644 *state = State::WaitingForResult;
645 }
646 Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
647 Poll::Pending => return Poll::Pending,
648 },
649 }
650 }
651 }
652
653 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
654 let buffer = self.buffer.lock().unwrap();
655 if buffer.is_empty() {
656 Poll::Ready(Ok(()))
657 } else {
658 Poll::Pending
659 }
660 }
661}
662
663#[cfg(feature = "aggregation")]
665async fn submit_job_result(
666 cache: crate::cache::SharedServiceConfigCache,
667 client: Arc<TangleClient>,
668 service_id: u64,
669 call_id: u64,
670 job_index: u8,
671 output: Bytes,
672 agg_config: Option<AggregationServiceConfig>,
673) -> Result<(), AggregatingConsumerError> {
674 let config =
676 AggregatingConsumer::get_aggregation_config(&cache, &client, service_id, job_index).await?;
677
678 if config.required {
679 blueprint_core::info!(
680 target: "tangle-aggregating-consumer",
681 "Job {} for service {} requires aggregation (threshold: {}bps, type: {:?})",
682 call_id,
683 service_id,
684 config.threshold_bps,
685 config.threshold_type
686 );
687
688 let agg = agg_config.ok_or(AggregatingConsumerError::AggregationNotConfigured)?;
690
691 submit_aggregated_result(
692 cache, client, service_id, call_id, job_index, output, config, agg,
693 )
694 .await
695 } else {
696 submit_direct_result(client, service_id, call_id, output).await
698 }
699}
700
701#[cfg(feature = "aggregation")]
702struct AggregationTaskInit {
703 operator_count: u32,
704 threshold: ThresholdConfig,
705}
706
707#[cfg(feature = "aggregation")]
708async fn prepare_aggregation_task(
709 cache: &crate::cache::SharedServiceConfigCache,
710 client: &TangleClient,
711 service_id: u64,
712 job_index: u8,
713 config: &AggregationConfig,
714) -> Result<AggregationTaskInit, AggregatingConsumerError> {
715 let operators = cache
716 .get_service_operators(client, service_id)
717 .await
718 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
719
720 if operators.is_empty() {
721 return Err(AggregatingConsumerError::Client(format!(
722 "No operators registered for service {service_id}"
723 )));
724 }
725
726 let operator_count = operators.len() as u32;
727
728 let threshold = match config.threshold_type {
729 ThresholdType::CountBased => {
730 let required = integration::calculate_required_signers(
731 operators.len(),
732 config.threshold_bps,
733 ThresholdType::CountBased,
734 None,
735 );
736 ThresholdConfig::Count {
737 required_signers: required as u32,
738 }
739 }
740 ThresholdType::StakeWeighted => {
741 let weights = cache
742 .get_operator_weights(client, service_id)
743 .await
744 .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
745
746 if weights.is_empty() {
747 blueprint_core::warn!(
748 target: "tangle-aggregating-consumer",
749 service_id,
750 job_index,
751 "No operator weights found for service {}; falling back to count-based threshold",
752 service_id
753 );
754 let required = integration::calculate_required_signers(
755 operators.len(),
756 config.threshold_bps,
757 ThresholdType::CountBased,
758 None,
759 );
760 ThresholdConfig::Count {
761 required_signers: required as u32,
762 }
763 } else {
764 let mut stakes = Vec::with_capacity(operators.len());
765 let mut numeric_stakes = Vec::with_capacity(operators.len());
766 for (idx, operator) in operators.iter().enumerate() {
767 let weight = u64::from(*weights.weights.get(operator).unwrap_or(&0));
768 stakes.push(OperatorStake {
769 operator_index: idx as u32,
770 stake: weight,
771 });
772 numeric_stakes.push(weight);
773 }
774
775 if numeric_stakes.iter().all(|stake| *stake == 0) {
776 blueprint_core::warn!(
777 target: "tangle-aggregating-consumer",
778 service_id,
779 job_index,
780 "Operator weights for service {} are zero; falling back to count-based threshold",
781 service_id
782 );
783 let required = integration::calculate_required_signers(
784 operators.len(),
785 config.threshold_bps,
786 ThresholdType::CountBased,
787 None,
788 );
789 ThresholdConfig::Count {
790 required_signers: required as u32,
791 }
792 } else {
793 blueprint_core::trace!(
794 target: "tangle-aggregating-consumer",
795 service_id,
796 job_index,
797 stakes = ?numeric_stakes,
798 "Prepared stake-weighted threshold"
799 );
800
801 ThresholdConfig::StakeWeighted {
802 threshold_bps: u32::from(config.threshold_bps),
803 operator_stakes: stakes,
804 }
805 }
806 }
807 }
808 };
809
810 Ok(AggregationTaskInit {
811 operator_count,
812 threshold,
813 })
814}
815
816#[cfg(not(feature = "aggregation"))]
818async fn submit_job_result(
819 cache: crate::cache::SharedServiceConfigCache,
820 client: Arc<TangleClient>,
821 service_id: u64,
822 call_id: u64,
823 job_index: u8,
824 output: Bytes,
825) -> Result<(), AggregatingConsumerError> {
826 let config =
828 AggregatingConsumer::get_aggregation_config(&cache, &client, service_id, job_index).await?;
829
830 if config.required {
831 blueprint_core::warn!(
832 target: "tangle-aggregating-consumer",
833 "Job {} for service {} requires aggregation but 'aggregation' feature not enabled. \
834 Enable the feature and configure the aggregation service.",
835 call_id,
836 service_id,
837 );
838 Ok(())
839 } else {
840 submit_direct_result(client, service_id, call_id, output).await
841 }
842}
843
844#[cfg(feature = "aggregation")]
852async fn submit_aggregated_result(
853 cache: crate::cache::SharedServiceConfigCache,
854 client: Arc<TangleClient>,
855 service_id: u64,
856 call_id: u64,
857 job_index: u8,
858 output: Bytes,
859 config: AggregationConfig,
860 agg: AggregationServiceConfig,
861) -> Result<(), AggregatingConsumerError> {
862 use blueprint_crypto_bn254::ArkBlsBn254;
863 use blueprint_crypto_core::{BytesEncoding, KeyType};
864 use blueprint_tangle_aggregation_svc::{SubmitSignatureRequest, create_signing_message};
865
866 let task_init =
867 prepare_aggregation_task(&cache, &client, service_id, job_index, &config).await?;
868
869 blueprint_core::debug!(
870 target: "tangle-aggregating-consumer",
871 service_id,
872 call_id,
873 job_index,
874 operator_count = task_init.operator_count,
875 threshold = ?task_init.threshold,
876 "Prepared aggregation task initialization payload"
877 );
878
879 blueprint_core::debug!(
880 target: "tangle-aggregating-consumer",
881 "Submitting signature to {} aggregation service(s) for service {} call {}",
882 agg.clients.len(),
883 service_id,
884 call_id
885 );
886
887 let message = create_signing_message(service_id, call_id, &output);
889
890 let mut secret_clone = (*agg.bls_secret).clone();
892 let signature = ArkBlsBn254::sign_with_secret(&mut secret_clone, &message)
893 .map_err(|e| AggregatingConsumerError::Bls(e.to_string()))?;
894
895 let pubkey_bytes = agg.bls_public.to_bytes();
897 let sig_bytes = signature.to_bytes();
898
899 let submit_request = SubmitSignatureRequest {
901 service_id,
902 call_id,
903 operator_index: agg.operator_index,
904 output: output.to_vec(),
905 signature: sig_bytes.clone(),
906 public_key: pubkey_bytes.clone(),
907 };
908
909 let mut any_success = false;
911 let mut last_response = None;
912
913 for (idx, service_client) in agg.clients.iter().enumerate() {
914 let _ = service_client
916 .init_task(
917 service_id,
918 call_id,
919 output.as_ref(),
920 task_init.operator_count,
921 task_init.threshold.clone(),
922 )
923 .await;
924
925 match service_client
927 .submit_signature(submit_request.clone())
928 .await
929 {
930 Ok(response) => {
931 blueprint_core::info!(
932 target: "tangle-aggregating-consumer",
933 "Submitted signature to aggregation service {}: {}/{} signatures (threshold met: {})",
934 idx,
935 response.signatures_collected,
936 response.threshold_required,
937 response.threshold_met
938 );
939 any_success = true;
940 last_response = Some(response);
941 }
942 Err(e) => {
943 blueprint_core::warn!(
944 target: "tangle-aggregating-consumer",
945 "Failed to submit to aggregation service {}: {}",
946 idx,
947 e
948 );
949 }
950 }
951 }
952
953 if !any_success {
954 return Err(AggregatingConsumerError::Client(
955 "Failed to submit to any aggregation service".to_string(),
956 ));
957 }
958
959 if !agg.submit_to_chain {
961 blueprint_core::debug!(
962 target: "tangle-aggregating-consumer",
963 "submit_to_chain is disabled, not submitting to chain"
964 );
965 return Ok(());
966 }
967
968 let response = last_response.unwrap();
970
971 if response.threshold_met {
972 if let Err(e) =
974 try_submit_aggregated_to_chain(client.clone(), &agg, service_id, call_id).await
975 {
976 blueprint_core::debug!(
977 target: "tangle-aggregating-consumer",
978 "Failed to submit aggregated result (likely already submitted): {}",
979 e
980 );
981 }
982 } else if agg.wait_for_threshold {
983 blueprint_core::debug!(
985 target: "tangle-aggregating-consumer",
986 "Waiting for threshold to be met..."
987 );
988
989 let result = wait_for_threshold_any_service(&agg, service_id, call_id).await?;
991
992 if let Err(e) =
994 submit_aggregated_to_chain_with_result(client, &agg, service_id, call_id, result).await
995 {
996 blueprint_core::debug!(
997 target: "tangle-aggregating-consumer",
998 "Failed to submit aggregated result (likely already submitted by another operator): {}",
999 e
1000 );
1001 }
1002 }
1003
1004 Ok(())
1005}
1006
1007#[cfg(feature = "aggregation")]
1009async fn wait_for_threshold_any_service(
1010 agg: &AggregationServiceConfig,
1011 service_id: u64,
1012 call_id: u64,
1013) -> Result<blueprint_tangle_aggregation_svc::AggregatedResultResponse, AggregatingConsumerError> {
1014 use blueprint_std::time::Instant;
1015
1016 let start = Instant::now();
1017 let timeout = agg.threshold_timeout;
1018 let poll_interval = agg.poll_interval;
1019
1020 while start.elapsed() < timeout {
1021 for client in &agg.clients {
1023 match client.get_aggregated(service_id, call_id).await {
1024 Ok(Some(result)) => {
1025 return Ok(result);
1026 }
1027 Ok(None) => {
1028 }
1030 Err(e) => {
1031 blueprint_core::trace!(
1032 target: "tangle-aggregating-consumer",
1033 "Error polling aggregation service: {}",
1034 e
1035 );
1036 }
1037 }
1038 }
1039
1040 tokio::time::sleep(poll_interval).await;
1041 }
1042
1043 Err(AggregatingConsumerError::Client(
1044 "Timeout waiting for aggregation threshold".to_string(),
1045 ))
1046}
1047
1048#[cfg(feature = "aggregation")]
1050async fn try_submit_aggregated_to_chain(
1051 client: Arc<TangleClient>,
1052 agg: &AggregationServiceConfig,
1053 service_id: u64,
1054 call_id: u64,
1055) -> Result<(), AggregatingConsumerError> {
1056 for service_client in &agg.clients {
1058 if let Ok(Some(result)) = service_client.get_aggregated(service_id, call_id).await {
1059 return submit_aggregated_to_chain_with_result(
1060 client, agg, service_id, call_id, result,
1061 )
1062 .await;
1063 }
1064 }
1065
1066 Err(AggregatingConsumerError::Client(
1067 "Aggregated result not available from any service".to_string(),
1068 ))
1069}
1070
1071#[cfg(feature = "aggregation")]
1073async fn submit_aggregated_to_chain_with_result(
1074 client: Arc<TangleClient>,
1075 agg: &AggregationServiceConfig,
1076 service_id: u64,
1077 call_id: u64,
1078 result: blueprint_tangle_aggregation_svc::AggregatedResultResponse,
1079) -> Result<(), AggregatingConsumerError> {
1080 use crate::aggregation::{AggregatedResult, G1Point, G2Point, SignerBitmap};
1081
1082 if client.config.dry_run {
1083 blueprint_core::info!(
1084 target: "tangle-aggregating-consumer",
1085 "Dry run enabled; skipping aggregated result submission for service {} call {}",
1086 service_id,
1087 call_id
1088 );
1089 return Ok(());
1090 }
1091
1092 blueprint_core::info!(
1093 target: "tangle-aggregating-consumer",
1094 "Submitting aggregated result to chain for service {} call {}",
1095 service_id,
1096 call_id
1097 );
1098
1099 let signature = G1Point::from_bytes(&result.aggregated_signature)
1101 .ok_or_else(|| AggregatingConsumerError::Bls("Invalid aggregated signature".to_string()))?;
1102 let pubkey = G2Point::from_bytes(&result.aggregated_pubkey)
1103 .ok_or_else(|| AggregatingConsumerError::Bls("Invalid aggregated pubkey".to_string()))?;
1104
1105 let aggregated = AggregatedResult::new(
1106 service_id,
1107 call_id,
1108 Bytes::from(result.output),
1109 SignerBitmap(result.signer_bitmap),
1110 signature,
1111 pubkey,
1112 );
1113
1114 aggregated
1116 .submit(&Arc::new(client.as_ref().clone()))
1117 .await?;
1118
1119 for client in &agg.clients {
1121 let _ = client.mark_submitted(service_id, call_id).await;
1122 }
1123
1124 blueprint_core::info!(
1125 target: "tangle-aggregating-consumer",
1126 "Successfully submitted aggregated result for service {} call {}",
1127 service_id,
1128 call_id
1129 );
1130
1131 Ok(())
1132}
1133
1134async fn submit_direct_result(
1136 client: Arc<TangleClient>,
1137 service_id: u64,
1138 call_id: u64,
1139 output: Bytes,
1140) -> Result<(), AggregatingConsumerError> {
1141 blueprint_core::debug!(
1142 target: "tangle-aggregating-consumer",
1143 "Submitting direct result for service {} call {}",
1144 service_id,
1145 call_id
1146 );
1147
1148 if client.config.dry_run {
1149 blueprint_core::info!(
1150 target: "tangle-aggregating-consumer",
1151 "Dry run enabled; skipping direct result submission for service {} call {}",
1152 service_id,
1153 call_id
1154 );
1155 return Ok(());
1156 }
1157
1158 let result = client
1159 .submit_result(service_id, call_id, output)
1160 .await
1161 .map_err(|e| {
1162 AggregatingConsumerError::Transaction(format!("Failed to submit result: {e}"))
1163 })?;
1164
1165 if result.success {
1166 blueprint_core::info!(
1167 target: "tangle-aggregating-consumer",
1168 "Successfully submitted direct result for service {} call {}: tx_hash={:?}",
1169 service_id,
1170 call_id,
1171 result.tx_hash
1172 );
1173 } else {
1174 return Err(AggregatingConsumerError::Transaction(format!(
1175 "Transaction reverted for service {} call {}: tx_hash={:?}",
1176 service_id, call_id, result.tx_hash
1177 )));
1178 }
1179
1180 Ok(())
1181}
1182
1183pub mod integration {
1188 use super::*;
1189
1190 pub fn create_signing_message(service_id: u64, call_id: u64, output: &[u8]) -> Vec<u8> {
1194 use alloy_primitives::keccak256;
1195
1196 let output_hash = keccak256(output);
1197 let mut message = Vec::with_capacity(8 + 8 + 32);
1198 message.extend_from_slice(&service_id.to_be_bytes());
1199 message.extend_from_slice(&call_id.to_be_bytes());
1200 message.extend_from_slice(output_hash.as_slice());
1201 message
1202 }
1203
1204 pub fn calculate_required_signers(
1206 total_operators: usize,
1207 threshold_bps: u16,
1208 threshold_type: ThresholdType,
1209 operator_stakes: Option<&[u64]>,
1210 ) -> usize {
1211 fn count_based(total: usize, threshold_bps: u16) -> usize {
1212 if total == 0 {
1213 return 1;
1214 }
1215 let mut required = (total as u64 * threshold_bps as u64) / 10000;
1216 if required == 0 {
1217 required = 1;
1218 }
1219 let required = required as usize;
1220 required.min(total).max(1)
1221 }
1222
1223 match threshold_type {
1224 ThresholdType::CountBased => count_based(total_operators, threshold_bps),
1225 ThresholdType::StakeWeighted => {
1226 if let Some(stakes) = operator_stakes {
1227 if stakes.is_empty() || stakes.iter().all(|stake| *stake == 0) {
1228 return count_based(total_operators, threshold_bps);
1229 }
1230
1231 let total_stake: u128 = stakes.iter().map(|s| *s as u128).sum();
1232 if total_stake == 0 {
1233 return count_based(total_operators, threshold_bps);
1234 }
1235
1236 let mut required_stake = (total_stake * threshold_bps as u128) / 10000u128;
1237 if required_stake == 0 {
1238 required_stake = 1;
1239 }
1240
1241 let mut sorted: Vec<u64> = stakes.to_vec();
1242 sorted.sort_by(|a, b| b.cmp(a));
1243
1244 let mut accumulated: u128 = 0;
1245 let mut required_signers = 0usize;
1246
1247 for stake in sorted {
1248 required_signers += 1;
1249 accumulated += stake as u128;
1250 if accumulated >= required_stake {
1251 break;
1252 }
1253 }
1254
1255 required_signers.min(total_operators.max(1)).max(1)
1256 } else {
1257 count_based(total_operators, threshold_bps)
1258 }
1259 }
1260 }
1261 }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266 use super::integration::*;
1267 use blueprint_client_tangle::ThresholdType;
1268
1269 #[test]
1274 fn test_create_signing_message_format() {
1275 let service_id = 1u64;
1276 let call_id = 42u64;
1277 let output = b"test output";
1278
1279 let message = create_signing_message(service_id, call_id, output);
1280
1281 assert_eq!(message.len(), 48);
1283
1284 assert_eq!(&message[0..8], &service_id.to_be_bytes());
1286
1287 assert_eq!(&message[8..16], &call_id.to_be_bytes());
1289
1290 use alloy_primitives::keccak256;
1292 let expected_hash = keccak256(output);
1293 assert_eq!(&message[16..48], expected_hash.as_slice());
1294 }
1295
1296 #[test]
1297 fn test_create_signing_message_deterministic() {
1298 let msg1 = create_signing_message(1, 2, b"hello");
1299 let msg2 = create_signing_message(1, 2, b"hello");
1300 assert_eq!(msg1, msg2);
1301 }
1302
1303 #[test]
1304 fn test_create_signing_message_different_outputs() {
1305 let msg1 = create_signing_message(1, 2, b"hello");
1306 let msg2 = create_signing_message(1, 2, b"world");
1307 assert_ne!(msg1, msg2);
1309 assert_eq!(&msg1[0..16], &msg2[0..16]);
1311 }
1312
1313 #[test]
1314 fn test_create_signing_message_empty_output() {
1315 let msg = create_signing_message(1, 2, b"");
1316 assert_eq!(msg.len(), 48);
1317 }
1318
1319 #[test]
1324 fn test_calculate_required_signers_count_based_67_percent() {
1325 let required = calculate_required_signers(3, 6700, ThresholdType::CountBased, None);
1327 assert_eq!(required, 2);
1328 }
1329
1330 #[test]
1331 fn test_calculate_required_signers_count_based_50_percent() {
1332 let required = calculate_required_signers(4, 5000, ThresholdType::CountBased, None);
1334 assert_eq!(required, 2);
1335 }
1336
1337 #[test]
1338 fn test_calculate_required_signers_count_based_100_percent() {
1339 let required = calculate_required_signers(5, 10000, ThresholdType::CountBased, None);
1341 assert_eq!(required, 5);
1342 }
1343
1344 #[test]
1345 fn test_calculate_required_signers_count_based_minimum_one() {
1346 let required = calculate_required_signers(10, 100, ThresholdType::CountBased, None); assert_eq!(required, 1);
1349 }
1350
1351 #[test]
1352 fn test_calculate_required_signers_count_based_single_operator() {
1353 let required = calculate_required_signers(1, 6700, ThresholdType::CountBased, None);
1355 assert_eq!(required, 1);
1356 }
1357
1358 #[test]
1359 fn test_calculate_required_signers_count_based_large_set() {
1360 let required = calculate_required_signers(100, 6700, ThresholdType::CountBased, None);
1362 assert_eq!(required, 67);
1363 }
1364
1365 #[test]
1370 fn test_calculate_required_signers_stake_weighted_no_stakes() {
1371 let required = calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, None);
1373 assert_eq!(required, 2);
1374 }
1375
1376 #[test]
1377 fn test_calculate_required_signers_stake_weighted_equal_stakes() {
1378 let stakes = [10u64, 10, 10];
1381 let required =
1382 calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, Some(&stakes));
1383 assert_eq!(required, 2);
1384 }
1385
1386 #[test]
1387 fn test_calculate_required_signers_stake_weighted_unequal_stakes() {
1388 let stakes = [5u64, 3, 2];
1392 let required =
1393 calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, Some(&stakes));
1394 assert_eq!(required, 2);
1395 }
1396
1397 #[test]
1398 fn test_calculate_required_signers_stake_weighted_minimum_one() {
1399 let stakes = [100u64, 100, 100];
1401 let required = calculate_required_signers(
1402 3,
1403 100, ThresholdType::StakeWeighted,
1405 Some(&stakes),
1406 );
1407 assert_eq!(required, 1);
1408 }
1409
1410 fn convert_rpc_to_aggregation_url(rpc: &str, aggregation_path: &str) -> Option<String> {
1417 if rpc.is_empty() {
1418 return None;
1419 }
1420
1421 if aggregation_path.starts_with(':') {
1422 if let Some(host_end) = rpc.rfind(':') {
1424 let before_port = &rpc[..host_end];
1425 if before_port.contains("://") {
1426 return Some(format!("{}{}", before_port, aggregation_path));
1427 }
1428 }
1429 Some(format!("{}{}", rpc, aggregation_path))
1431 } else {
1432 let base = rpc.trim_end_matches('/');
1434 Some(format!("{}{}", base, aggregation_path))
1435 }
1436 }
1437
1438 #[test]
1439 fn test_url_conversion_port_replacement() {
1440 let url = convert_rpc_to_aggregation_url("http://localhost:8545", ":9090");
1442 assert_eq!(url, Some("http://localhost:9090".to_string()));
1443
1444 let url = convert_rpc_to_aggregation_url("https://operator.example.com:8545", ":9090");
1445 assert_eq!(url, Some("https://operator.example.com:9090".to_string()));
1446 }
1447
1448 #[test]
1449 fn test_url_conversion_port_append() {
1450 let url = convert_rpc_to_aggregation_url("http://localhost", ":9090");
1452 assert_eq!(url, Some("http://localhost:9090".to_string()));
1453 }
1454
1455 #[test]
1456 fn test_url_conversion_path_append() {
1457 let url = convert_rpc_to_aggregation_url("http://localhost:8545", "/aggregation");
1459 assert_eq!(url, Some("http://localhost:8545/aggregation".to_string()));
1460
1461 let url = convert_rpc_to_aggregation_url("http://localhost:8545/", "/aggregation");
1462 assert_eq!(url, Some("http://localhost:8545/aggregation".to_string()));
1463 }
1464
1465 #[test]
1466 fn test_url_conversion_empty() {
1467 let url = convert_rpc_to_aggregation_url("", ":9090");
1468 assert_eq!(url, None);
1469 }
1470
1471 #[test]
1472 fn test_url_conversion_complex_urls() {
1473 let url = convert_rpc_to_aggregation_url("http://[::1]:8545", ":9090");
1475 assert_eq!(url, Some("http://[::1]:9090".to_string()));
1476
1477 let url = convert_rpc_to_aggregation_url("http://localhost:8545/rpc", "/v1/aggregate");
1479 assert_eq!(
1480 url,
1481 Some("http://localhost:8545/rpc/v1/aggregate".to_string())
1482 );
1483 }
1484
1485 #[cfg(feature = "aggregation")]
1490 mod aggregation_config_tests {
1491 use crate::AggregationServiceConfig;
1492 use blueprint_crypto_bn254::ArkBlsBn254;
1493 use blueprint_crypto_core::KeyType;
1494 use std::time::Duration;
1495
1496 fn test_bls_secret() -> blueprint_crypto_bn254::ArkBlsBn254Secret {
1497 let seed = [1u8; 32];
1499 ArkBlsBn254::generate_with_seed(Some(&seed)).unwrap()
1500 }
1501
1502 #[test]
1503 fn test_config_single_service() {
1504 let config =
1505 AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1506 assert_eq!(config.clients.len(), 1);
1507 assert_eq!(config.operator_index, 0);
1508 assert!(config.submit_to_chain);
1509 }
1510
1511 #[test]
1512 fn test_config_multiple_services() {
1513 let config = AggregationServiceConfig::with_multiple_services(
1514 vec![
1515 "http://service1:8080",
1516 "http://service2:8080",
1517 "http://service3:8080",
1518 ],
1519 test_bls_secret(),
1520 1,
1521 );
1522 assert_eq!(config.clients.len(), 3);
1523 assert_eq!(config.operator_index, 1);
1524 }
1525
1526 #[test]
1527 fn test_config_add_service() {
1528 let config =
1529 AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0)
1530 .add_service("http://backup:8080")
1531 .add_service("http://fallback:8080");
1532
1533 assert_eq!(config.clients.len(), 3);
1534 }
1535
1536 #[test]
1537 fn test_config_with_submit_to_chain() {
1538 let config =
1539 AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1540 assert!(config.submit_to_chain);
1542
1543 let config = config.with_submit_to_chain(false);
1544 assert!(!config.submit_to_chain);
1545 }
1546
1547 #[test]
1548 fn test_config_with_wait_for_threshold() {
1549 let config =
1550 AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1551 assert!(!config.wait_for_threshold);
1553
1554 let config = config.with_wait_for_threshold(true);
1555 assert!(config.wait_for_threshold);
1556 }
1557
1558 #[test]
1559 fn test_config_with_threshold_timeout() {
1560 let config =
1561 AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0)
1562 .with_threshold_timeout(Duration::from_secs(120));
1563
1564 assert_eq!(config.threshold_timeout, Duration::from_secs(120));
1565 }
1566
1567 #[test]
1568 fn test_config_client_accessor() {
1569 let config = AggregationServiceConfig::with_multiple_services(
1570 vec!["http://service1:8080", "http://service2:8080"],
1571 test_bls_secret(),
1572 0,
1573 );
1574 let _client = config.client();
1576 }
1578
1579 #[test]
1580 fn test_config_empty_services() {
1581 let config = AggregationServiceConfig::with_multiple_services(
1583 Vec::<String>::new(),
1584 test_bls_secret(),
1585 0,
1586 );
1587 assert_eq!(config.clients.len(), 0);
1588 }
1589 }
1590}