1use alloy_primitives::Bytes;
36use blueprint_std::string::String;
37#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
38use blueprint_std::time::Duration;
39use blueprint_std::vec::Vec;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub enum ThresholdType {
44 #[default]
46 CountBased,
47 StakeWeighted,
49}
50
51impl From<u8> for ThresholdType {
52 fn from(value: u8) -> Self {
53 match value {
54 1 => ThresholdType::StakeWeighted,
55 _ => ThresholdType::CountBased,
56 }
57 }
58}
59
60#[cfg(feature = "aggregation")]
61use blueprint_std::sync::Arc;
62
63#[cfg(feature = "aggregation")]
64use blueprint_crypto_bn254::{ArkBlsBn254, ArkBlsBn254Public, ArkBlsBn254Secret};
65
66#[cfg(all(feature = "p2p-aggregation", not(feature = "aggregation")))]
67use blueprint_crypto_bn254::{ArkBlsBn254, ArkBlsBn254Public};
68
69#[cfg(feature = "p2p-aggregation")]
70use blueprint_std::collections::HashMap;
71
72#[derive(Clone)]
78pub enum AggregationStrategy {
79 #[cfg(feature = "aggregation")]
84 HttpService(HttpServiceConfig),
85
86 #[cfg(feature = "p2p-aggregation")]
91 P2PGossip(P2PGossipConfig),
92}
93
94#[cfg(feature = "aggregation")]
96#[derive(Clone)]
97pub struct HttpServiceConfig {
98 pub client: blueprint_tangle_aggregation_svc::AggregationServiceClient,
100 pub bls_secret: Arc<ArkBlsBn254Secret>,
102 pub bls_public: Arc<ArkBlsBn254Public>,
104 pub operator_index: u32,
106 pub wait_for_threshold: bool,
108 pub threshold_timeout: Duration,
110 pub poll_interval: Duration,
112}
113
114#[cfg(feature = "aggregation")]
115impl HttpServiceConfig {
116 pub fn new(
118 service_url: impl Into<String>,
119 bls_secret: ArkBlsBn254Secret,
120 operator_index: u32,
121 ) -> Self {
122 use blueprint_crypto_core::KeyType;
123
124 let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
125 Self {
126 client: blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url),
127 bls_secret: Arc::new(bls_secret),
128 bls_public: Arc::new(bls_public),
129 operator_index,
130 wait_for_threshold: false,
131 threshold_timeout: Duration::from_secs(60),
132 poll_interval: Duration::from_secs(1),
133 }
134 }
135
136 #[must_use]
138 pub fn with_wait_for_threshold(mut self, wait: bool) -> Self {
139 self.wait_for_threshold = wait;
140 self
141 }
142
143 #[must_use]
145 pub fn with_threshold_timeout(mut self, timeout: Duration) -> Self {
146 self.threshold_timeout = timeout;
147 self
148 }
149
150 #[must_use]
152 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153 self.poll_interval = interval;
154 self
155 }
156}
157
158#[cfg(feature = "p2p-aggregation")]
160#[derive(Clone)]
161pub struct P2PGossipConfig {
162 pub network_handle: blueprint_networking::service_handle::NetworkServiceHandle<ArkBlsBn254>,
164 pub num_aggregators: u16,
166 pub timeout: Duration,
168 pub threshold_bps: u16,
171 pub threshold_type: ThresholdType,
173 pub participant_public_keys: HashMap<libp2p::PeerId, ArkBlsBn254Public>,
175 pub operator_weights: HashMap<libp2p::PeerId, u64>,
179}
180
181#[cfg(feature = "p2p-aggregation")]
182impl P2PGossipConfig {
183 pub fn new(
185 network_handle: blueprint_networking::service_handle::NetworkServiceHandle<ArkBlsBn254>,
186 participant_public_keys: HashMap<libp2p::PeerId, ArkBlsBn254Public>,
187 ) -> Self {
188 Self {
189 network_handle,
190 num_aggregators: 2,
191 timeout: Duration::from_secs(30),
192 threshold_bps: 6700, threshold_type: ThresholdType::CountBased,
194 participant_public_keys,
195 operator_weights: HashMap::new(),
196 }
197 }
198
199 #[must_use]
201 pub fn with_num_aggregators(mut self, num: u16) -> Self {
202 self.num_aggregators = num;
203 self
204 }
205
206 #[must_use]
208 pub fn with_timeout(mut self, timeout: Duration) -> Self {
209 self.timeout = timeout;
210 self
211 }
212
213 #[must_use]
215 pub fn with_threshold_percentage(mut self, percentage: u8) -> Self {
216 self.threshold_bps = u16::from(percentage) * 100;
217 self
218 }
219
220 #[must_use]
223 pub fn with_threshold_bps(mut self, bps: u16) -> Self {
224 self.threshold_bps = bps;
225 self
226 }
227
228 #[must_use]
230 pub fn with_threshold_type(mut self, threshold_type: ThresholdType) -> Self {
231 self.threshold_type = threshold_type;
232 self
233 }
234
235 #[must_use]
251 pub fn with_operator_weights(mut self, weights: HashMap<libp2p::PeerId, u64>) -> Self {
252 self.operator_weights = weights;
253 self
254 }
255
256 #[must_use]
260 pub fn with_stake_weighted_threshold(
261 mut self,
262 threshold_bps: u16,
263 weights: HashMap<libp2p::PeerId, u64>,
264 ) -> Self {
265 self.threshold_bps = threshold_bps;
266 self.threshold_type = ThresholdType::StakeWeighted;
267 self.operator_weights = weights;
268 self
269 }
270}
271
272#[derive(Debug, Clone)]
274pub struct AggregatedSignatureResult {
275 pub service_id: u64,
277 pub call_id: u64,
279 pub output: Bytes,
281 pub aggregated_signature: Vec<u8>,
283 pub aggregated_pubkey: Vec<u8>,
285 pub signer_bitmap: alloy_primitives::U256,
287 pub non_signer_indices: Vec<u32>,
289}
290
291#[derive(Debug, thiserror::Error)]
293pub enum StrategyError {
294 #[cfg(feature = "aggregation")]
296 #[error("HTTP service error: {0}")]
297 HttpService(#[from] blueprint_tangle_aggregation_svc::ClientError),
298
299 #[cfg(feature = "p2p-aggregation")]
301 #[error("P2P protocol error: {0}")]
302 P2PProtocol(String),
303
304 #[error("BLS error: {0}")]
306 Bls(String),
307
308 #[error(
310 "No aggregation strategy configured - enable 'aggregation' or 'p2p-aggregation' feature"
311 )]
312 NoAggregationStrategy,
313
314 #[error("Threshold not met: got {got}, need {need}")]
316 ThresholdNotMet { got: usize, need: usize },
317
318 #[error("Aggregation timed out")]
320 Timeout,
321
322 #[error("Serialization error: {0}")]
324 Serialization(String),
325}
326
327#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
328impl AggregationStrategy {
329 pub async fn aggregate(
342 &self,
343 service_id: u64,
344 call_id: u64,
345 output: Bytes,
346 total_operators: u32,
347 threshold: u32,
348 ) -> Result<AggregatedSignatureResult, StrategyError> {
349 match self {
350 #[cfg(feature = "aggregation")]
351 AggregationStrategy::HttpService(config) => {
352 aggregate_via_http(
353 config,
354 service_id,
355 call_id,
356 output,
357 total_operators,
358 threshold,
359 )
360 .await
361 }
362 #[cfg(feature = "p2p-aggregation")]
363 AggregationStrategy::P2PGossip(config) => {
364 aggregate_via_p2p(
365 config.clone(),
366 service_id,
367 call_id,
368 output,
369 total_operators,
370 threshold,
371 )
372 .await
373 }
374 #[allow(unreachable_patterns)]
375 _ => Err(StrategyError::NoAggregationStrategy),
376 }
377 }
378}
379
380#[cfg(not(any(feature = "aggregation", feature = "p2p-aggregation")))]
381impl AggregationStrategy {
382 pub async fn aggregate(
383 &self,
384 _service_id: u64,
385 _call_id: u64,
386 _output: Bytes,
387 _total_operators: u32,
388 _threshold: u32,
389 ) -> Result<AggregatedSignatureResult, StrategyError> {
390 Err(StrategyError::NoAggregationStrategy)
391 }
392}
393
394#[cfg(feature = "aggregation")]
396async fn aggregate_via_http(
397 config: &HttpServiceConfig,
398 service_id: u64,
399 call_id: u64,
400 output: Bytes,
401 total_operators: u32,
402 threshold: u32,
403) -> Result<AggregatedSignatureResult, StrategyError> {
404 use blueprint_crypto_core::{BytesEncoding, KeyType};
405 use blueprint_tangle_aggregation_svc::{
406 SubmitSignatureRequest, ThresholdConfig, create_signing_message,
407 };
408
409 blueprint_core::debug!(
410 target: "aggregation-strategy",
411 "Aggregating via HTTP service for service {} call {}",
412 service_id, call_id
413 );
414
415 let message = create_signing_message(service_id, call_id, &output);
417
418 let mut secret_clone = (*config.bls_secret).clone();
420 let signature = ArkBlsBn254::sign_with_secret(&mut secret_clone, &message)
421 .map_err(|e| StrategyError::Bls(e.to_string()))?;
422
423 let pubkey_bytes = config.bls_public.to_bytes();
425 let sig_bytes = signature.to_bytes();
426
427 let threshold_config = ThresholdConfig::Count {
429 required_signers: threshold.max(1),
430 };
431 let _ = config
432 .client
433 .init_task(
434 service_id,
435 call_id,
436 &output,
437 total_operators,
438 threshold_config,
439 )
440 .await;
441
442 let submit_request = SubmitSignatureRequest {
444 service_id,
445 call_id,
446 operator_index: config.operator_index,
447 output: output.to_vec(),
448 signature: sig_bytes,
449 public_key: pubkey_bytes,
450 };
451
452 let response = config.client.submit_signature(submit_request).await?;
453
454 blueprint_core::info!(
455 target: "aggregation-strategy",
456 "Submitted signature: {}/{} (threshold met: {})",
457 response.signatures_collected,
458 response.threshold_required,
459 response.threshold_met
460 );
461
462 let result = if config.wait_for_threshold {
464 if response.threshold_met {
465 config
466 .client
467 .get_aggregated(service_id, call_id)
468 .await?
469 .ok_or_else(|| StrategyError::Bls("Aggregated result not available".into()))?
470 } else {
471 config
472 .client
473 .wait_for_threshold(
474 service_id,
475 call_id,
476 config.poll_interval,
477 config.threshold_timeout,
478 )
479 .await?
480 }
481 } else if response.threshold_met {
482 config
483 .client
484 .get_aggregated(service_id, call_id)
485 .await?
486 .ok_or_else(|| StrategyError::Bls("Aggregated result not available".into()))?
487 } else {
488 return Err(StrategyError::ThresholdNotMet {
490 got: response.signatures_collected,
491 need: response.threshold_required,
492 });
493 };
494
495 Ok(AggregatedSignatureResult {
496 service_id: result.service_id,
497 call_id: result.call_id,
498 output: Bytes::from(result.output),
499 aggregated_signature: result.aggregated_signature,
500 aggregated_pubkey: result.aggregated_pubkey,
501 signer_bitmap: result.signer_bitmap,
502 non_signer_indices: result.non_signer_indices,
503 })
504}
505
506#[cfg(feature = "p2p-aggregation")]
508async fn aggregate_via_p2p(
509 config: P2PGossipConfig,
510 service_id: u64,
511 call_id: u64,
512 output: Bytes,
513 _total_operators: u32,
514 _threshold: u32,
515) -> Result<AggregatedSignatureResult, StrategyError> {
516 use blueprint_crypto::hashing::blake3_256;
517 use blueprint_crypto_core::BytesEncoding;
518 use blueprint_networking_agg_sig_gossip_extension::{
519 DynamicWeight, ProtocolConfig, SignatureAggregationProtocol,
520 };
521
522 blueprint_core::debug!(
523 target: "aggregation-strategy",
524 "Aggregating via P2P gossip for service {} call {} (threshold_bps: {}, type: {:?})",
525 service_id, call_id, config.threshold_bps, config.threshold_type
526 );
527
528 let message = crate::aggregating_consumer::integration::create_signing_message(
530 service_id, call_id, &output,
531 );
532
533 let message_hash = blake3_256(&message);
535
536 let protocol_config = ProtocolConfig::new(
538 config.network_handle.clone(),
539 config.num_aggregators,
540 config.timeout,
541 );
542
543 let weight_scheme = match config.threshold_type {
546 ThresholdType::CountBased => {
547 let num_participants = config.participant_public_keys.len();
549 let threshold_percentage = (config.threshold_bps / 100) as u8;
550 blueprint_core::debug!(
551 target: "aggregation-strategy",
552 "Using CountBased threshold: {}% of {} participants",
553 threshold_percentage, num_participants
554 );
555 DynamicWeight::equal(num_participants, threshold_percentage)
556 }
557 ThresholdType::StakeWeighted => {
558 let total_weight: u64 = config.operator_weights.values().sum();
561 let threshold_weight = (total_weight * u64::from(config.threshold_bps)) / 10000;
562
563 blueprint_core::debug!(
564 target: "aggregation-strategy",
565 "Using StakeWeighted threshold: {} / {} total weight ({}bps)",
566 threshold_weight, total_weight, config.threshold_bps
567 );
568
569 if config.operator_weights.is_empty() {
570 blueprint_core::warn!(
571 target: "aggregation-strategy",
572 "StakeWeighted threshold requested but no operator weights provided, falling back to EqualWeight"
573 );
574 let num_participants = config.participant_public_keys.len();
575 let threshold_percentage = (config.threshold_bps / 100) as u8;
576 DynamicWeight::equal(num_participants, threshold_percentage)
577 } else {
578 DynamicWeight::custom(config.operator_weights.clone(), threshold_weight)
579 }
580 }
581 };
582
583 let mut protocol = SignatureAggregationProtocol::new(
585 protocol_config,
586 weight_scheme,
587 config.participant_public_keys.clone(),
588 );
589
590 let result = protocol
591 .run(&message_hash)
592 .await
593 .map_err(|e| StrategyError::P2PProtocol(format!("{:?}", e)))?;
594
595 blueprint_core::info!(
596 target: "aggregation-strategy",
597 "P2P aggregation complete: {} contributors",
598 result.contributors.len()
599 );
600
601 let sig_bytes = result.signature.to_bytes();
604
605 let mut signer_bitmap = alloy_primitives::U256::ZERO;
607 let mut non_signer_indices = Vec::new();
608
609 let sorted_peers: Vec<_> = config.participant_public_keys.keys().cloned().collect();
612 for (idx, peer_id) in sorted_peers.iter().enumerate() {
613 if result.contributors.contains(peer_id) {
614 signer_bitmap |= alloy_primitives::U256::from(1u64) << idx;
615 } else {
616 non_signer_indices.push(idx as u32);
617 }
618 }
619
620 let signer_pubkeys: Vec<_> = sorted_peers
622 .iter()
623 .filter(|p| result.contributors.contains(p))
624 .filter_map(|p| config.participant_public_keys.get(p).cloned())
625 .collect();
626
627 let aggregated_pubkey = if signer_pubkeys.len() == 1 {
628 signer_pubkeys[0].to_bytes()
629 } else {
630 use blueprint_crypto::aggregation::AggregatableSignature;
632 let dummy_sigs: Vec<_> = (0..signer_pubkeys.len())
633 .map(|_| result.signature.clone())
634 .collect();
635 let (_, agg_pk) = ArkBlsBn254::aggregate(&dummy_sigs, &signer_pubkeys)
636 .map_err(|e| StrategyError::Bls(format!("Failed to aggregate pubkeys: {:?}", e)))?;
637 agg_pk.to_bytes()
638 };
639
640 Ok(AggregatedSignatureResult {
641 service_id,
642 call_id,
643 output,
644 aggregated_signature: sig_bytes,
645 aggregated_pubkey,
646 signer_bitmap,
647 non_signer_indices,
648 })
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn test_aggregated_signature_result_debug() {
657 let result = AggregatedSignatureResult {
658 service_id: 1,
659 call_id: 42,
660 output: Bytes::from(vec![1, 2, 3]),
661 aggregated_signature: vec![4, 5, 6],
662 aggregated_pubkey: vec![7, 8, 9],
663 signer_bitmap: alloy_primitives::U256::from(7u64),
664 non_signer_indices: vec![3],
665 };
666
667 let debug_str = format!("{:?}", result);
669 assert!(debug_str.contains("service_id: 1"));
670 assert!(debug_str.contains("call_id: 42"));
671 }
672}