Skip to main content

blueprint_tangle_extra/
strategy.rs

1//! Aggregation Strategy for BLS Signature Aggregation
2//!
3//! This module provides a strategy pattern for choosing between different
4//! signature aggregation methods:
5//! - HTTP Service: Uses a centralized aggregation service (simpler deployment)
6//! - P2P Gossip: Uses peer-to-peer gossip protocol (fully decentralized)
7//!
8//! ## Threshold Types
9//!
10//! The aggregation system supports two threshold types matching the on-chain configuration:
11//! - `CountBased`: Threshold is based on the number of operators (e.g., 67% of operators must sign)
12//! - `StakeWeighted`: Threshold is based on operator stake exposure (e.g., 67% of total stake must sign)
13//!
14//! ## Usage
15//!
16//! ```rust,ignore
17//! use blueprint_tangle_extra::strategy::{AggregationStrategy, HttpServiceConfig};
18//!
19//! // Use HTTP service (recommended for most cases)
20//! let strategy = AggregationStrategy::HttpService(HttpServiceConfig::new(
21//!     "http://localhost:8080",
22//!     bls_secret,
23//!     operator_index,
24//! ));
25//!
26//! // Or use P2P gossip (for fully decentralized setups)
27//! #[cfg(feature = "p2p-aggregation")]
28//! let strategy = AggregationStrategy::P2PGossip(P2PGossipConfig::new(
29//!     network_handle,
30//!     bls_secret,
31//!     participant_keys,
32//! ));
33//! ```
34
35use alloy_primitives::Bytes;
36use blueprint_std::string::String;
37#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
38use blueprint_std::time::Duration;
39use blueprint_std::vec::Vec;
40
41/// Threshold type for BLS aggregation (matches on-chain Types.ThresholdType)
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub enum ThresholdType {
44    /// Percentage of operator count (each operator has weight 1)
45    #[default]
46    CountBased,
47    /// Percentage of total stake/exposure (operators weighted by their stake exposure)
48    StakeWeighted,
49}
50
51impl From<u8> for ThresholdType {
52    fn from(value: u8) -> Self {
53        match value {
54            1 => ThresholdType::StakeWeighted,
55            _ => ThresholdType::CountBased,
56        }
57    }
58}
59
60#[cfg(feature = "aggregation")]
61use blueprint_std::sync::Arc;
62
63#[cfg(feature = "aggregation")]
64use blueprint_crypto_bn254::{ArkBlsBn254, ArkBlsBn254Public, ArkBlsBn254Secret};
65
66#[cfg(all(feature = "p2p-aggregation", not(feature = "aggregation")))]
67use blueprint_crypto_bn254::{ArkBlsBn254, ArkBlsBn254Public};
68
69#[cfg(feature = "p2p-aggregation")]
70use blueprint_std::collections::HashMap;
71
72/// Strategy for how to aggregate BLS signatures
73///
74/// Blueprint developers can choose between:
75/// - `HttpService`: Uses a centralized aggregation service (recommended)
76/// - `P2PGossip`: Uses peer-to-peer gossip protocol (advanced)
77#[derive(Clone)]
78pub enum AggregationStrategy {
79    /// Use an HTTP aggregation service (recommended)
80    ///
81    /// This is simpler to deploy and more reliable. Any operator can run
82    /// the aggregation service, making it semi-decentralized.
83    #[cfg(feature = "aggregation")]
84    HttpService(HttpServiceConfig),
85
86    /// Use peer-to-peer gossip protocol
87    ///
88    /// This is fully decentralized but requires P2P connectivity between
89    /// operators. More complex to set up and debug.
90    #[cfg(feature = "p2p-aggregation")]
91    P2PGossip(P2PGossipConfig),
92}
93
94/// Configuration for HTTP-based aggregation service
95#[cfg(feature = "aggregation")]
96#[derive(Clone)]
97pub struct HttpServiceConfig {
98    /// HTTP client for the aggregation service
99    pub client: blueprint_tangle_aggregation_svc::AggregationServiceClient,
100    /// BLS secret key for signing
101    pub bls_secret: Arc<ArkBlsBn254Secret>,
102    /// BLS public key (derived from secret)
103    pub bls_public: Arc<ArkBlsBn254Public>,
104    /// Operator index in the service
105    pub operator_index: u32,
106    /// Whether to wait for threshold to be met before returning
107    pub wait_for_threshold: bool,
108    /// Timeout for waiting for threshold
109    pub threshold_timeout: Duration,
110    /// Poll interval when waiting for threshold
111    pub poll_interval: Duration,
112}
113
114#[cfg(feature = "aggregation")]
115impl HttpServiceConfig {
116    /// Create a new HTTP service config
117    pub fn new(
118        service_url: impl Into<String>,
119        bls_secret: ArkBlsBn254Secret,
120        operator_index: u32,
121    ) -> Self {
122        use blueprint_crypto_core::KeyType;
123
124        let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
125        Self {
126            client: blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url),
127            bls_secret: Arc::new(bls_secret),
128            bls_public: Arc::new(bls_public),
129            operator_index,
130            wait_for_threshold: false,
131            threshold_timeout: Duration::from_secs(60),
132            poll_interval: Duration::from_secs(1),
133        }
134    }
135
136    /// Set whether to wait for threshold to be met
137    #[must_use]
138    pub fn with_wait_for_threshold(mut self, wait: bool) -> Self {
139        self.wait_for_threshold = wait;
140        self
141    }
142
143    /// Set the timeout for waiting for threshold
144    #[must_use]
145    pub fn with_threshold_timeout(mut self, timeout: Duration) -> Self {
146        self.threshold_timeout = timeout;
147        self
148    }
149
150    /// Set the poll interval when waiting for threshold
151    #[must_use]
152    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153        self.poll_interval = interval;
154        self
155    }
156}
157
158/// Configuration for P2P gossip-based aggregation
159#[cfg(feature = "p2p-aggregation")]
160#[derive(Clone)]
161pub struct P2PGossipConfig {
162    /// Network service handle for P2P communication
163    pub network_handle: blueprint_networking::service_handle::NetworkServiceHandle<ArkBlsBn254>,
164    /// Number of aggregators to select
165    pub num_aggregators: u16,
166    /// Timeout for the aggregation protocol
167    pub timeout: Duration,
168    /// Threshold in basis points (e.g., 6700 for 67%)
169    /// This matches the on-chain thresholdBps format
170    pub threshold_bps: u16,
171    /// Threshold type (CountBased or StakeWeighted)
172    pub threshold_type: ThresholdType,
173    /// Map of participant peer IDs to their public keys
174    pub participant_public_keys: HashMap<libp2p::PeerId, ArkBlsBn254Public>,
175    /// Map of participant peer IDs to their stake weights (exposure in basis points)
176    /// Only used when threshold_type is StakeWeighted
177    /// Values are in basis points (10000 = 100%)
178    pub operator_weights: HashMap<libp2p::PeerId, u64>,
179}
180
181#[cfg(feature = "p2p-aggregation")]
182impl P2PGossipConfig {
183    /// Create a new P2P gossip config with default count-based threshold (67%)
184    pub fn new(
185        network_handle: blueprint_networking::service_handle::NetworkServiceHandle<ArkBlsBn254>,
186        participant_public_keys: HashMap<libp2p::PeerId, ArkBlsBn254Public>,
187    ) -> Self {
188        Self {
189            network_handle,
190            num_aggregators: 2,
191            timeout: Duration::from_secs(30),
192            threshold_bps: 6700, // 67% in basis points
193            threshold_type: ThresholdType::CountBased,
194            participant_public_keys,
195            operator_weights: HashMap::new(),
196        }
197    }
198
199    /// Set the number of aggregators
200    #[must_use]
201    pub fn with_num_aggregators(mut self, num: u16) -> Self {
202        self.num_aggregators = num;
203        self
204    }
205
206    /// Set the protocol timeout
207    #[must_use]
208    pub fn with_timeout(mut self, timeout: Duration) -> Self {
209        self.timeout = timeout;
210        self
211    }
212
213    /// Set the threshold percentage (convenience method that converts to basis points)
214    #[must_use]
215    pub fn with_threshold_percentage(mut self, percentage: u8) -> Self {
216        self.threshold_bps = u16::from(percentage) * 100;
217        self
218    }
219
220    /// Set the threshold in basis points (e.g., 6700 for 67%)
221    /// This matches the on-chain thresholdBps format directly
222    #[must_use]
223    pub fn with_threshold_bps(mut self, bps: u16) -> Self {
224        self.threshold_bps = bps;
225        self
226    }
227
228    /// Set the threshold type (CountBased or StakeWeighted)
229    #[must_use]
230    pub fn with_threshold_type(mut self, threshold_type: ThresholdType) -> Self {
231        self.threshold_type = threshold_type;
232        self
233    }
234
235    /// Set the operator weights for stake-weighted thresholds
236    ///
237    /// The weights should be the operator's exposure in basis points (from ServiceOperator.exposureBps).
238    /// This is only used when `threshold_type` is `StakeWeighted`.
239    ///
240    /// # Example
241    ///
242    /// ```rust,ignore
243    /// // Operator 1 has 5000 bps (50%) exposure, operator 2 has 3000 bps (30%)
244    /// let weights = HashMap::from([
245    ///     (peer_id_1, 5000),
246    ///     (peer_id_2, 3000),
247    /// ]);
248    /// config.with_operator_weights(weights)
249    /// ```
250    #[must_use]
251    pub fn with_operator_weights(mut self, weights: HashMap<libp2p::PeerId, u64>) -> Self {
252        self.operator_weights = weights;
253        self
254    }
255
256    /// Configure for stake-weighted threshold with operator weights
257    ///
258    /// This is a convenience method that sets both the threshold type and weights.
259    #[must_use]
260    pub fn with_stake_weighted_threshold(
261        mut self,
262        threshold_bps: u16,
263        weights: HashMap<libp2p::PeerId, u64>,
264    ) -> Self {
265        self.threshold_bps = threshold_bps;
266        self.threshold_type = ThresholdType::StakeWeighted;
267        self.operator_weights = weights;
268        self
269    }
270}
271
272/// Result of a successful aggregation
273#[derive(Debug, Clone)]
274pub struct AggregatedSignatureResult {
275    /// The service ID
276    pub service_id: u64,
277    /// The call ID
278    pub call_id: u64,
279    /// The job output
280    pub output: Bytes,
281    /// Aggregated BLS signature (G1 point, serialized)
282    pub aggregated_signature: Vec<u8>,
283    /// Aggregated BLS public key (G2 point, serialized)
284    pub aggregated_pubkey: Vec<u8>,
285    /// Bitmap indicating which operators signed
286    pub signer_bitmap: alloy_primitives::U256,
287    /// Indices of operators who did not sign
288    pub non_signer_indices: Vec<u32>,
289}
290
291/// Error type for aggregation strategies
292#[derive(Debug, thiserror::Error)]
293pub enum StrategyError {
294    /// HTTP service error
295    #[cfg(feature = "aggregation")]
296    #[error("HTTP service error: {0}")]
297    HttpService(#[from] blueprint_tangle_aggregation_svc::ClientError),
298
299    /// P2P protocol error
300    #[cfg(feature = "p2p-aggregation")]
301    #[error("P2P protocol error: {0}")]
302    P2PProtocol(String),
303
304    /// BLS crypto error
305    #[error("BLS error: {0}")]
306    Bls(String),
307
308    /// No aggregation strategy configured
309    #[error(
310        "No aggregation strategy configured - enable 'aggregation' or 'p2p-aggregation' feature"
311    )]
312    NoAggregationStrategy,
313
314    /// Threshold not met
315    #[error("Threshold not met: got {got}, need {need}")]
316    ThresholdNotMet { got: usize, need: usize },
317
318    /// Timeout
319    #[error("Aggregation timed out")]
320    Timeout,
321
322    /// Serialization error
323    #[error("Serialization error: {0}")]
324    Serialization(String),
325}
326
327#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
328impl AggregationStrategy {
329    /// Execute the aggregation strategy for a job result
330    ///
331    /// This signs the output, coordinates with other operators (via HTTP or P2P),
332    /// and returns the aggregated result ready for on-chain submission.
333    ///
334    /// # Arguments
335    ///
336    /// * `service_id` - The service ID from the job system
337    /// * `call_id` - The call ID for this specific job invocation
338    /// * `output` - The job output to sign
339    /// * `total_operators` - Total number of operators in the service
340    /// * `threshold` - Minimum number of signatures required (from job system)
341    pub async fn aggregate(
342        &self,
343        service_id: u64,
344        call_id: u64,
345        output: Bytes,
346        total_operators: u32,
347        threshold: u32,
348    ) -> Result<AggregatedSignatureResult, StrategyError> {
349        match self {
350            #[cfg(feature = "aggregation")]
351            AggregationStrategy::HttpService(config) => {
352                aggregate_via_http(
353                    config,
354                    service_id,
355                    call_id,
356                    output,
357                    total_operators,
358                    threshold,
359                )
360                .await
361            }
362            #[cfg(feature = "p2p-aggregation")]
363            AggregationStrategy::P2PGossip(config) => {
364                aggregate_via_p2p(
365                    config.clone(),
366                    service_id,
367                    call_id,
368                    output,
369                    total_operators,
370                    threshold,
371                )
372                .await
373            }
374            #[allow(unreachable_patterns)]
375            _ => Err(StrategyError::NoAggregationStrategy),
376        }
377    }
378}
379
380#[cfg(not(any(feature = "aggregation", feature = "p2p-aggregation")))]
381impl AggregationStrategy {
382    pub async fn aggregate(
383        &self,
384        _service_id: u64,
385        _call_id: u64,
386        _output: Bytes,
387        _total_operators: u32,
388        _threshold: u32,
389    ) -> Result<AggregatedSignatureResult, StrategyError> {
390        Err(StrategyError::NoAggregationStrategy)
391    }
392}
393
394/// Aggregate via HTTP service
395#[cfg(feature = "aggregation")]
396async fn aggregate_via_http(
397    config: &HttpServiceConfig,
398    service_id: u64,
399    call_id: u64,
400    output: Bytes,
401    total_operators: u32,
402    threshold: u32,
403) -> Result<AggregatedSignatureResult, StrategyError> {
404    use blueprint_crypto_core::{BytesEncoding, KeyType};
405    use blueprint_tangle_aggregation_svc::{
406        SubmitSignatureRequest, ThresholdConfig, create_signing_message,
407    };
408
409    blueprint_core::debug!(
410        target: "aggregation-strategy",
411        "Aggregating via HTTP service for service {} call {}",
412        service_id, call_id
413    );
414
415    // Create the message to sign
416    let message = create_signing_message(service_id, call_id, &output);
417
418    // Sign with BLS key
419    let mut secret_clone = (*config.bls_secret).clone();
420    let signature = ArkBlsBn254::sign_with_secret(&mut secret_clone, &message)
421        .map_err(|e| StrategyError::Bls(e.to_string()))?;
422
423    // Get bytes
424    let pubkey_bytes = config.bls_public.to_bytes();
425    let sig_bytes = signature.to_bytes();
426
427    // Try to initialize the task (may already exist)
428    let threshold_config = ThresholdConfig::Count {
429        required_signers: threshold.max(1),
430    };
431    let _ = config
432        .client
433        .init_task(
434            service_id,
435            call_id,
436            &output,
437            total_operators,
438            threshold_config,
439        )
440        .await;
441
442    // Submit our signature
443    let submit_request = SubmitSignatureRequest {
444        service_id,
445        call_id,
446        operator_index: config.operator_index,
447        output: output.to_vec(),
448        signature: sig_bytes,
449        public_key: pubkey_bytes,
450    };
451
452    let response = config.client.submit_signature(submit_request).await?;
453
454    blueprint_core::info!(
455        target: "aggregation-strategy",
456        "Submitted signature: {}/{} (threshold met: {})",
457        response.signatures_collected,
458        response.threshold_required,
459        response.threshold_met
460    );
461
462    // Wait for threshold if configured
463    let result = if config.wait_for_threshold {
464        if response.threshold_met {
465            config
466                .client
467                .get_aggregated(service_id, call_id)
468                .await?
469                .ok_or_else(|| StrategyError::Bls("Aggregated result not available".into()))?
470        } else {
471            config
472                .client
473                .wait_for_threshold(
474                    service_id,
475                    call_id,
476                    config.poll_interval,
477                    config.threshold_timeout,
478                )
479                .await?
480        }
481    } else if response.threshold_met {
482        config
483            .client
484            .get_aggregated(service_id, call_id)
485            .await?
486            .ok_or_else(|| StrategyError::Bls("Aggregated result not available".into()))?
487    } else {
488        // Return early - threshold not met and not waiting
489        return Err(StrategyError::ThresholdNotMet {
490            got: response.signatures_collected,
491            need: response.threshold_required,
492        });
493    };
494
495    Ok(AggregatedSignatureResult {
496        service_id: result.service_id,
497        call_id: result.call_id,
498        output: Bytes::from(result.output),
499        aggregated_signature: result.aggregated_signature,
500        aggregated_pubkey: result.aggregated_pubkey,
501        signer_bitmap: result.signer_bitmap,
502        non_signer_indices: result.non_signer_indices,
503    })
504}
505
506/// Aggregate via P2P gossip protocol
507#[cfg(feature = "p2p-aggregation")]
508async fn aggregate_via_p2p(
509    config: P2PGossipConfig,
510    service_id: u64,
511    call_id: u64,
512    output: Bytes,
513    _total_operators: u32,
514    _threshold: u32,
515) -> Result<AggregatedSignatureResult, StrategyError> {
516    use blueprint_crypto::hashing::blake3_256;
517    use blueprint_crypto_core::BytesEncoding;
518    use blueprint_networking_agg_sig_gossip_extension::{
519        DynamicWeight, ProtocolConfig, SignatureAggregationProtocol,
520    };
521
522    blueprint_core::debug!(
523        target: "aggregation-strategy",
524        "Aggregating via P2P gossip for service {} call {} (threshold_bps: {}, type: {:?})",
525        service_id, call_id, config.threshold_bps, config.threshold_type
526    );
527
528    // Create the message to sign (same format as HTTP)
529    let message = crate::aggregating_consumer::integration::create_signing_message(
530        service_id, call_id, &output,
531    );
532
533    // Hash the message for the protocol
534    let message_hash = blake3_256(&message);
535
536    // Create protocol config
537    let protocol_config = ProtocolConfig::new(
538        config.network_handle.clone(),
539        config.num_aggregators,
540        config.timeout,
541    );
542
543    // Create weight scheme based on threshold type
544    // This matches the on-chain logic in Jobs.sol _validateSignersAndThreshold
545    let weight_scheme = match config.threshold_type {
546        ThresholdType::CountBased => {
547            // CountBased: each operator has weight 1, threshold is percentage of operator count
548            let num_participants = config.participant_public_keys.len();
549            let threshold_percentage = (config.threshold_bps / 100) as u8;
550            blueprint_core::debug!(
551                target: "aggregation-strategy",
552                "Using CountBased threshold: {}% of {} participants",
553                threshold_percentage, num_participants
554            );
555            DynamicWeight::equal(num_participants, threshold_percentage)
556        }
557        ThresholdType::StakeWeighted => {
558            // StakeWeighted: operators weighted by their stake exposure (exposureBps)
559            // threshold_weight = (total_weight * threshold_bps) / 10000
560            let total_weight: u64 = config.operator_weights.values().sum();
561            let threshold_weight = (total_weight * u64::from(config.threshold_bps)) / 10000;
562
563            blueprint_core::debug!(
564                target: "aggregation-strategy",
565                "Using StakeWeighted threshold: {} / {} total weight ({}bps)",
566                threshold_weight, total_weight, config.threshold_bps
567            );
568
569            if config.operator_weights.is_empty() {
570                blueprint_core::warn!(
571                    target: "aggregation-strategy",
572                    "StakeWeighted threshold requested but no operator weights provided, falling back to EqualWeight"
573                );
574                let num_participants = config.participant_public_keys.len();
575                let threshold_percentage = (config.threshold_bps / 100) as u8;
576                DynamicWeight::equal(num_participants, threshold_percentage)
577            } else {
578                DynamicWeight::custom(config.operator_weights.clone(), threshold_weight)
579            }
580        }
581    };
582
583    // Create and run the protocol
584    let mut protocol = SignatureAggregationProtocol::new(
585        protocol_config,
586        weight_scheme,
587        config.participant_public_keys.clone(),
588    );
589
590    let result = protocol
591        .run(&message_hash)
592        .await
593        .map_err(|e| StrategyError::P2PProtocol(format!("{:?}", e)))?;
594
595    blueprint_core::info!(
596        target: "aggregation-strategy",
597        "P2P aggregation complete: {} contributors",
598        result.contributors.len()
599    );
600
601    // Convert the result to our format
602    // Note: The P2P protocol returns an AggregatedSignature, we need to serialize it
603    let sig_bytes = result.signature.to_bytes();
604
605    // Build signer bitmap from contributors
606    let mut signer_bitmap = alloy_primitives::U256::ZERO;
607    let mut non_signer_indices = Vec::new();
608
609    // We need to map PeerIds back to operator indices
610    // For now, we'll use a simple approach based on sorted order
611    let sorted_peers: Vec<_> = config.participant_public_keys.keys().cloned().collect();
612    for (idx, peer_id) in sorted_peers.iter().enumerate() {
613        if result.contributors.contains(peer_id) {
614            signer_bitmap |= alloy_primitives::U256::from(1u64) << idx;
615        } else {
616            non_signer_indices.push(idx as u32);
617        }
618    }
619
620    // Aggregate public keys of signers
621    let signer_pubkeys: Vec<_> = sorted_peers
622        .iter()
623        .filter(|p| result.contributors.contains(p))
624        .filter_map(|p| config.participant_public_keys.get(p).cloned())
625        .collect();
626
627    let aggregated_pubkey = if signer_pubkeys.len() == 1 {
628        signer_pubkeys[0].to_bytes()
629    } else {
630        // Aggregate the public keys
631        use blueprint_crypto::aggregation::AggregatableSignature;
632        let dummy_sigs: Vec<_> = (0..signer_pubkeys.len())
633            .map(|_| result.signature.clone())
634            .collect();
635        let (_, agg_pk) = ArkBlsBn254::aggregate(&dummy_sigs, &signer_pubkeys)
636            .map_err(|e| StrategyError::Bls(format!("Failed to aggregate pubkeys: {:?}", e)))?;
637        agg_pk.to_bytes()
638    };
639
640    Ok(AggregatedSignatureResult {
641        service_id,
642        call_id,
643        output,
644        aggregated_signature: sig_bytes,
645        aggregated_pubkey,
646        signer_bitmap,
647        non_signer_indices,
648    })
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654
655    #[test]
656    fn test_aggregated_signature_result_debug() {
657        let result = AggregatedSignatureResult {
658            service_id: 1,
659            call_id: 42,
660            output: Bytes::from(vec![1, 2, 3]),
661            aggregated_signature: vec![4, 5, 6],
662            aggregated_pubkey: vec![7, 8, 9],
663            signer_bitmap: alloy_primitives::U256::from(7u64),
664            non_signer_indices: vec![3],
665        };
666
667        // Should be Debug-able
668        let debug_str = format!("{:?}", result);
669        assert!(debug_str.contains("service_id: 1"));
670        assert!(debug_str.contains("call_id: 42"));
671    }
672}