Skip to main content

blueprint_tangle_extra/
aggregating_consumer.rs

1//! Aggregation-Aware Job Consumer
2//!
3//! This consumer automatically detects whether a job requires BLS aggregation
4//! and routes to the appropriate submission path:
5//! - Jobs NOT requiring aggregation: Submit directly via `submitResult`
6//! - Jobs requiring aggregation: Coordinate with other operators via the
7//!   aggregation service, and submit via `submitAggregatedResult`
8//!
9//! ## Usage
10//!
11//! ```rust,ignore
12//! use blueprint_tangle_extra::AggregatingConsumer;
13//!
14//! // Create the consumer with aggregation support
15//! let consumer = AggregatingConsumer::new(client)
16//!     .with_aggregation_service("http://localhost:8080", bls_keypair, operator_index);
17//!
18//! // Use it just like TangleConsumer - it automatically handles aggregation!
19//! consumer.send(job_result).await?;
20//! ```
21
22use crate::aggregation::AggregationError;
23use crate::extract;
24use alloy_primitives::{Address, Bytes};
25use blueprint_client_tangle::{AggregationConfig, OperatorMetadata, TangleClient, ThresholdType};
26use blueprint_core::JobResult;
27use blueprint_core::error::BoxError;
28use blueprint_std::boxed::Box;
29use blueprint_std::collections::{HashMap, VecDeque};
30use blueprint_std::format;
31use blueprint_std::string::{String, ToString};
32use blueprint_std::sync::{Arc, Mutex};
33#[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
34use blueprint_std::time::Duration;
35use blueprint_std::vec::Vec;
36#[cfg(feature = "aggregation")]
37use blueprint_tangle_aggregation_svc::{OperatorStake, ThresholdConfig};
38use core::pin::Pin;
39use core::task::{Context, Poll};
40use futures_util::Sink;
41
42/// Error type for the aggregating consumer
43#[derive(Debug, thiserror::Error)]
44pub enum AggregatingConsumerError {
45    /// Client error
46    #[error("Client error: {0}")]
47    Client(String),
48    /// Missing metadata
49    #[error("Missing metadata: {0}")]
50    MissingMetadata(&'static str),
51    /// Invalid metadata
52    #[error("Invalid metadata: {0}")]
53    InvalidMetadata(&'static str),
54    /// Transaction error
55    #[error("Transaction error: {0}")]
56    Transaction(String),
57    /// Aggregation error
58    #[error("Aggregation error: {0}")]
59    Aggregation(#[from] AggregationError),
60    /// Aggregation service error
61    #[cfg(feature = "aggregation")]
62    #[error("Aggregation service error: {0}")]
63    AggregationService(#[from] blueprint_tangle_aggregation_svc::ClientError),
64    /// BLS crypto error
65    #[cfg(feature = "aggregation")]
66    #[error("BLS error: {0}")]
67    Bls(String),
68    /// Aggregation not configured
69    #[error("Aggregation required but not configured. Call with_aggregation_service() first.")]
70    AggregationNotConfigured,
71}
72
73/// Job result with parsed metadata for submission
74struct PendingJobResult {
75    service_id: u64,
76    call_id: u64,
77    job_index: u8,
78    output: Bytes,
79}
80
81enum State {
82    WaitingForResult,
83    ProcessingSubmission(
84        Pin<Box<dyn core::future::Future<Output = Result<(), AggregatingConsumerError>> + Send>>,
85    ),
86}
87
88impl State {
89    fn is_waiting(&self) -> bool {
90        matches!(self, State::WaitingForResult)
91    }
92}
93
94/// Configuration for the aggregation service
95#[cfg(feature = "aggregation")]
96#[derive(Clone)]
97pub struct AggregationServiceConfig {
98    /// HTTP clients for aggregation services (supports multiple for redundancy)
99    pub clients: Vec<blueprint_tangle_aggregation_svc::AggregationServiceClient>,
100    /// BLS secret key for signing
101    pub bls_secret: Arc<blueprint_crypto_bn254::ArkBlsBn254Secret>,
102    /// BLS public key (derived from secret)
103    pub bls_public: Arc<blueprint_crypto_bn254::ArkBlsBn254Public>,
104    /// Operator index in the service
105    pub operator_index: u32,
106    /// Whether to wait for threshold to be met before returning
107    pub wait_for_threshold: bool,
108    /// Timeout for waiting for threshold (default: 60s)
109    pub threshold_timeout: Duration,
110    /// Poll interval when waiting for threshold (default: 1s)
111    pub poll_interval: Duration,
112    /// Whether to try to submit the aggregated result to chain (default: true)
113    /// When true, all operators race to submit; first valid submission wins
114    pub submit_to_chain: bool,
115}
116
117#[cfg(feature = "aggregation")]
118impl AggregationServiceConfig {
119    /// Create a new aggregation service config with a single service URL
120    pub fn new(
121        service_url: impl Into<String>,
122        bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
123        operator_index: u32,
124    ) -> Self {
125        use blueprint_crypto_bn254::ArkBlsBn254;
126        use blueprint_crypto_core::KeyType;
127
128        let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
129        Self {
130            clients: vec![
131                blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url),
132            ],
133            bls_secret: Arc::new(bls_secret),
134            bls_public: Arc::new(bls_public),
135            operator_index,
136            wait_for_threshold: false,
137            threshold_timeout: Duration::from_secs(60),
138            poll_interval: Duration::from_secs(1),
139            submit_to_chain: true, // Everyone tries to submit by default
140        }
141    }
142
143    /// Create a new aggregation service config with multiple service URLs
144    ///
145    /// This allows submitting to multiple aggregation services for redundancy.
146    /// Signatures will be sent to ALL services, and threshold polling will
147    /// try each service until one succeeds.
148    pub fn with_multiple_services(
149        service_urls: impl IntoIterator<Item = impl Into<String>>,
150        bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
151        operator_index: u32,
152    ) -> Self {
153        use blueprint_crypto_bn254::ArkBlsBn254;
154        use blueprint_crypto_core::KeyType;
155
156        let bls_public = ArkBlsBn254::public_from_secret(&bls_secret);
157        let clients = service_urls
158            .into_iter()
159            .map(|url| blueprint_tangle_aggregation_svc::AggregationServiceClient::new(url))
160            .collect();
161
162        Self {
163            clients,
164            bls_secret: Arc::new(bls_secret),
165            bls_public: Arc::new(bls_public),
166            operator_index,
167            wait_for_threshold: false,
168            threshold_timeout: Duration::from_secs(60),
169            poll_interval: Duration::from_secs(1),
170            submit_to_chain: true,
171        }
172    }
173
174    /// Add an additional aggregation service URL
175    pub fn add_service(mut self, service_url: impl Into<String>) -> Self {
176        self.clients
177            .push(blueprint_tangle_aggregation_svc::AggregationServiceClient::new(service_url));
178        self
179    }
180
181    /// Set whether to wait for threshold to be met
182    pub fn with_wait_for_threshold(mut self, wait: bool) -> Self {
183        self.wait_for_threshold = wait;
184        self
185    }
186
187    /// Set the timeout for waiting for threshold
188    pub fn with_threshold_timeout(mut self, timeout: Duration) -> Self {
189        self.threshold_timeout = timeout;
190        self
191    }
192
193    /// Set whether to submit the aggregated result to chain
194    ///
195    /// When true (default), this operator will attempt to submit the aggregated
196    /// result to chain once threshold is met. Multiple operators can race to submit;
197    /// the contract ensures only the first valid submission succeeds.
198    pub fn with_submit_to_chain(mut self, submit: bool) -> Self {
199        self.submit_to_chain = submit;
200        self
201    }
202
203    /// Get the first client (for backwards compatibility)
204    pub fn client(&self) -> &blueprint_tangle_aggregation_svc::AggregationServiceClient {
205        self.clients
206            .first()
207            .expect("At least one client must be configured")
208    }
209
210    /// Discover aggregation service URLs by reading operator metadata on chain.
211    ///
212    /// This queries the Tangle contract for registered operators, fetches their
213    /// metadata (ECDSA key + RPC endpoint), and converts those RPC endpoints into
214    /// aggregation service URLs using the provided path suffix.
215    ///
216    /// # Arguments
217    /// * `client` - Tangle EVM client used to query on-chain metadata
218    /// * `blueprint_id` - The blueprint ID to query operators for
219    /// * `service_id` - Service whose operator set should be scanned
220    /// * `aggregation_path` - Path suffix for aggregation service (e.g., "/aggregation" or ":8080")
221    ///
222    /// # Example
223    /// ```rust,ignore
224    /// // Discover operators and add their aggregation services
225    /// let urls = AggregationServiceConfig::discover_operator_services(
226    ///     &client,
227    ///     blueprint_id,
228    ///     service_id,
229    ///     ":9090",  // Aggregation service port
230    /// ).await?;
231    ///
232    /// let config = AggregationServiceConfig::with_multiple_services(
233    ///     urls,
234    ///     bls_secret,
235    ///     operator_index,
236    /// );
237    /// ```
238    pub async fn discover_operator_services(
239        client: &TangleClient,
240        blueprint_id: u64,
241        service_id: u64,
242        aggregation_path: &str,
243    ) -> Result<Vec<String>, AggregatingConsumerError> {
244        let operators = client
245            .get_service_operators(service_id)
246            .await
247            .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
248        let mut rpc_addresses = Vec::with_capacity(operators.len());
249        for operator in operators {
250            let metadata = client
251                .get_operator_metadata(blueprint_id, operator)
252                .await
253                .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
254            if !metadata.rpc_endpoint.is_empty() {
255                rpc_addresses.push(metadata.rpc_endpoint);
256            }
257        }
258
259        // Convert RPC addresses to aggregation service URLs
260        let urls: Vec<String> = rpc_addresses
261            .iter()
262            .filter_map(|rpc| {
263                // Parse the RPC address and append the aggregation path
264                if rpc.is_empty() {
265                    return None;
266                }
267
268                // If the path starts with ":", treat it as a port replacement
269                if aggregation_path.starts_with(':') {
270                    // Replace the port in the URL
271                    if let Some(host_end) = rpc.rfind(':') {
272                        // Check if there's already a port (not just protocol separator)
273                        let before_port = &rpc[..host_end];
274                        if before_port.contains("://") {
275                            return Some(format!("{}{}", before_port, aggregation_path));
276                        }
277                    }
278                    // No port found, just append
279                    Some(format!("{}{}", rpc, aggregation_path))
280                } else {
281                    // Append as a path
282                    let base = rpc.trim_end_matches('/');
283                    Some(format!("{}{}", base, aggregation_path))
284                }
285            })
286            .collect();
287
288        blueprint_core::info!(
289            target: "tangle-aggregating-consumer",
290            "Discovered {} operator aggregation services for blueprint {}",
291            urls.len(),
292            blueprint_id
293        );
294
295        Ok(urls)
296    }
297}
298
299/// An aggregation-aware consumer that automatically routes jobs to either
300/// direct submission or aggregated submission based on BSM configuration.
301///
302/// For jobs that require aggregation, this consumer:
303/// 1. Queries the BSM to check aggregation requirements
304/// 2. Signs the job output with the operator's BLS key
305/// 3. Coordinates aggregation via the configured strategy (HTTP or P2P)
306/// 4. Submits the aggregated result to the contract
307///
308/// For jobs that don't require aggregation, it behaves identically to `TangleConsumer`.
309///
310/// ## Aggregation Strategies
311///
312/// The consumer supports two aggregation strategies:
313///
314/// - **HTTP Service** (recommended): Uses a centralized aggregation service
315/// - **P2P Gossip**: Uses peer-to-peer gossip protocol for fully decentralized aggregation
316///
317/// ## Example
318///
319/// ```rust,ignore
320/// use blueprint_tangle_extra::{AggregatingConsumer, AggregationStrategy, HttpServiceConfig};
321///
322/// // Create consumer with HTTP aggregation strategy
323/// let consumer = AggregatingConsumer::new(client)
324///     .with_aggregation_strategy(AggregationStrategy::HttpService(
325///         HttpServiceConfig::new("http://localhost:8080", bls_secret, operator_index)
326///     ));
327/// ```
328pub struct AggregatingConsumer {
329    client: Arc<TangleClient>,
330    buffer: Mutex<VecDeque<PendingJobResult>>,
331    state: Mutex<State>,
332    /// Shared cache for service configs (aggregation configs, operator weights)
333    cache: crate::cache::SharedServiceConfigCache,
334    /// Aggregation service configuration (legacy, when feature enabled)
335    #[cfg(feature = "aggregation")]
336    aggregation_config: Option<AggregationServiceConfig>,
337    /// Aggregation strategy (new unified approach)
338    #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
339    aggregation_strategy: Option<crate::strategy::AggregationStrategy>,
340}
341
342impl AggregatingConsumer {
343    /// Create a new aggregating consumer with default cache (5 minute TTL)
344    pub fn new(client: TangleClient) -> Self {
345        Self {
346            client: Arc::new(client),
347            buffer: Mutex::new(VecDeque::new()),
348            state: Mutex::new(State::WaitingForResult),
349            cache: crate::cache::shared_cache(),
350            #[cfg(feature = "aggregation")]
351            aggregation_config: None,
352            #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
353            aggregation_strategy: None,
354        }
355    }
356
357    /// Create a new aggregating consumer with a custom cache
358    ///
359    /// This allows sharing a cache across multiple consumers or
360    /// configuring a custom TTL.
361    ///
362    /// ## Example
363    ///
364    /// ```rust,ignore
365    /// use blueprint_tangle_extra::{AggregatingConsumer, shared_cache_with_ttl};
366    /// use blueprint_std::time::Duration;
367    ///
368    /// // Create a shared cache with 10 minute TTL
369    /// let cache = shared_cache_with_ttl(Duration::from_secs(600));
370    ///
371    /// // Multiple consumers can share the same cache
372    /// let consumer1 = AggregatingConsumer::with_cache(client1, cache.clone());
373    /// let consumer2 = AggregatingConsumer::with_cache(client2, cache.clone());
374    /// ```
375    pub fn with_cache(client: TangleClient, cache: crate::cache::SharedServiceConfigCache) -> Self {
376        Self {
377            client: Arc::new(client),
378            buffer: Mutex::new(VecDeque::new()),
379            state: Mutex::new(State::WaitingForResult),
380            cache,
381            #[cfg(feature = "aggregation")]
382            aggregation_config: None,
383            #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
384            aggregation_strategy: None,
385        }
386    }
387
388    /// Configure the aggregation strategy
389    ///
390    /// This sets the strategy to use for BLS signature aggregation.
391    /// Choose between HTTP service (simpler) or P2P gossip (decentralized).
392    ///
393    /// ## Example
394    ///
395    /// ```rust,ignore
396    /// // HTTP service strategy (recommended)
397    /// let consumer = AggregatingConsumer::new(client)
398    ///     .with_aggregation_strategy(AggregationStrategy::HttpService(
399    ///         HttpServiceConfig::new("http://localhost:8080", bls_secret, 0)
400    ///     ));
401    ///
402    /// // P2P gossip strategy (advanced)
403    /// let consumer = AggregatingConsumer::new(client)
404    ///     .with_aggregation_strategy(AggregationStrategy::P2PGossip(
405    ///         P2PGossipConfig::new(network_handle, participant_keys)
406    ///     ));
407    /// ```
408    #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
409    pub fn with_aggregation_strategy(
410        mut self,
411        strategy: crate::strategy::AggregationStrategy,
412    ) -> Self {
413        self.aggregation_strategy = Some(strategy);
414        self
415    }
416
417    /// Get the configured aggregation strategy
418    #[cfg(any(feature = "aggregation", feature = "p2p-aggregation"))]
419    pub fn aggregation_strategy(&self) -> Option<&crate::strategy::AggregationStrategy> {
420        self.aggregation_strategy.as_ref()
421    }
422
423    /// Configure the aggregation service for BLS signature aggregation
424    ///
425    /// This enables automatic signing and submission to the aggregation service
426    /// when jobs require BLS aggregation.
427    #[cfg(feature = "aggregation")]
428    pub fn with_aggregation_service(
429        mut self,
430        service_url: impl Into<String>,
431        bls_secret: blueprint_crypto_bn254::ArkBlsBn254Secret,
432        operator_index: u32,
433    ) -> Self {
434        self.aggregation_config = Some(AggregationServiceConfig::new(
435            service_url,
436            bls_secret,
437            operator_index,
438        ));
439        self
440    }
441
442    /// Configure aggregation with full config options
443    #[cfg(feature = "aggregation")]
444    pub fn with_aggregation_config(mut self, config: AggregationServiceConfig) -> Self {
445        self.aggregation_config = Some(config);
446        self
447    }
448
449    /// Get the underlying client
450    #[must_use]
451    pub fn client(&self) -> &TangleClient {
452        &self.client
453    }
454
455    /// Get the service config cache
456    ///
457    /// This can be used to:
458    /// - Pre-populate the cache with known values
459    /// - Invalidate cached entries when configs change
460    /// - Share the cache with other components
461    #[must_use]
462    pub fn cache(&self) -> &crate::cache::SharedServiceConfigCache {
463        &self.cache
464    }
465
466    /// Invalidate all cached data for a service
467    ///
468    /// Call this when you know a service's configuration has changed
469    /// (e.g., operator joined/left, threshold changed).
470    pub fn invalidate_service_cache(&self, service_id: u64) {
471        self.cache.invalidate_service(service_id);
472    }
473
474    /// Get aggregation config, using cache when available
475    async fn get_aggregation_config(
476        cache: &crate::cache::SharedServiceConfigCache,
477        client: &TangleClient,
478        service_id: u64,
479        job_index: u8,
480    ) -> Result<AggregationConfig, AggregatingConsumerError> {
481        cache
482            .get_aggregation_config(client, service_id, job_index)
483            .await
484            .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
485    }
486
487    /// Get operator weights for a service, using cache when available
488    pub async fn get_operator_weights(
489        &self,
490        service_id: u64,
491    ) -> Result<crate::cache::OperatorWeights, AggregatingConsumerError> {
492        self.cache
493            .get_operator_weights(&self.client, service_id)
494            .await
495            .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
496    }
497
498    /// Get service operators list, using cache when available
499    pub async fn get_service_operators(
500        &self,
501        service_id: u64,
502    ) -> Result<crate::cache::ServiceOperators, AggregatingConsumerError> {
503        self.cache
504            .get_service_operators(&self.client, service_id)
505            .await
506            .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
507    }
508
509    /// Get operator metadata for all operators in a service.
510    pub async fn get_service_operator_metadata(
511        &self,
512        service_id: u64,
513    ) -> Result<HashMap<Address, OperatorMetadata>, AggregatingConsumerError> {
514        self.cache
515            .get_service_operator_metadata(
516                &self.client,
517                self.client.config.settings.blueprint_id,
518                service_id,
519            )
520            .await
521            .map_err(|e| AggregatingConsumerError::Client(e.to_string()))
522    }
523}
524
525impl Sink<JobResult> for AggregatingConsumer {
526    type Error = BoxError;
527
528    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
529        Poll::Ready(Ok(()))
530    }
531
532    fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
533        let JobResult::Ok { head, body } = &item else {
534            blueprint_core::trace!(target: "tangle-aggregating-consumer", "Discarding job result with error");
535            return Ok(());
536        };
537
538        let (Some(call_id_raw), Some(service_id_raw)) = (
539            head.metadata.get(extract::CallId::METADATA_KEY),
540            head.metadata.get(extract::ServiceId::METADATA_KEY),
541        ) else {
542            blueprint_core::trace!(target: "tangle-aggregating-consumer", "Discarding job result with missing metadata");
543            return Ok(());
544        };
545
546        // Get job index from metadata (defaults to 0 if not present)
547        let job_index: u8 = head
548            .metadata
549            .get(extract::JobIndex::METADATA_KEY)
550            .and_then(|v| {
551                let val: u64 = v.try_into().ok()?;
552                u8::try_from(val).ok()
553            })
554            .unwrap_or(0);
555
556        blueprint_core::debug!(
557            target: "tangle-aggregating-consumer",
558            result = ?item,
559            job_index = job_index,
560            "Received job result, handling..."
561        );
562
563        let call_id: u64 = call_id_raw
564            .try_into()
565            .map_err(|_| AggregatingConsumerError::InvalidMetadata("call_id"))?;
566        let service_id: u64 = service_id_raw
567            .try_into()
568            .map_err(|_| AggregatingConsumerError::InvalidMetadata("service_id"))?;
569
570        self.get_mut()
571            .buffer
572            .lock()
573            .unwrap()
574            .push_back(PendingJobResult {
575                service_id,
576                call_id,
577                job_index,
578                output: Bytes::copy_from_slice(body),
579            });
580        Ok(())
581    }
582
583    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
584        let consumer = self.get_mut();
585        let mut state = consumer.state.lock().unwrap();
586
587        {
588            let buffer = consumer.buffer.lock().unwrap();
589            if buffer.is_empty() && state.is_waiting() {
590                return Poll::Ready(Ok(()));
591            }
592        }
593
594        loop {
595            match &mut *state {
596                State::WaitingForResult => {
597                    let result = {
598                        let mut buffer = consumer.buffer.lock().unwrap();
599                        buffer.pop_front()
600                    };
601
602                    let Some(pending) = result else {
603                        return Poll::Ready(Ok(()));
604                    };
605
606                    let client = Arc::clone(&consumer.client);
607                    let cache = Arc::clone(&consumer.cache);
608
609                    #[cfg(feature = "aggregation")]
610                    let agg_config = consumer.aggregation_config.clone();
611
612                    let fut = Box::pin(async move {
613                        #[cfg(feature = "aggregation")]
614                        {
615                            submit_job_result(
616                                cache,
617                                client,
618                                pending.service_id,
619                                pending.call_id,
620                                pending.job_index,
621                                pending.output,
622                                agg_config,
623                            )
624                            .await
625                        }
626                        #[cfg(not(feature = "aggregation"))]
627                        {
628                            submit_job_result(
629                                cache,
630                                client,
631                                pending.service_id,
632                                pending.call_id,
633                                pending.job_index,
634                                pending.output,
635                            )
636                            .await
637                        }
638                    });
639
640                    *state = State::ProcessingSubmission(fut);
641                }
642                State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
643                    Poll::Ready(Ok(())) => {
644                        *state = State::WaitingForResult;
645                    }
646                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
647                    Poll::Pending => return Poll::Pending,
648                },
649            }
650        }
651    }
652
653    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
654        let buffer = self.buffer.lock().unwrap();
655        if buffer.is_empty() {
656            Poll::Ready(Ok(()))
657        } else {
658            Poll::Pending
659        }
660    }
661}
662
663/// Submit a job result, automatically choosing aggregation if required
664#[cfg(feature = "aggregation")]
665async fn submit_job_result(
666    cache: crate::cache::SharedServiceConfigCache,
667    client: Arc<TangleClient>,
668    service_id: u64,
669    call_id: u64,
670    job_index: u8,
671    output: Bytes,
672    agg_config: Option<AggregationServiceConfig>,
673) -> Result<(), AggregatingConsumerError> {
674    // Check if aggregation is required (uses cache)
675    let config =
676        AggregatingConsumer::get_aggregation_config(&cache, &client, service_id, job_index).await?;
677
678    if config.required {
679        blueprint_core::info!(
680            target: "tangle-aggregating-consumer",
681            "Job {} for service {} requires aggregation (threshold: {}bps, type: {:?})",
682            call_id,
683            service_id,
684            config.threshold_bps,
685            config.threshold_type
686        );
687
688        // Get aggregation config or error
689        let agg = agg_config.ok_or(AggregatingConsumerError::AggregationNotConfigured)?;
690
691        submit_aggregated_result(
692            cache, client, service_id, call_id, job_index, output, config, agg,
693        )
694        .await
695    } else {
696        // No aggregation needed, submit directly
697        submit_direct_result(client, service_id, call_id, output).await
698    }
699}
700
701#[cfg(feature = "aggregation")]
702struct AggregationTaskInit {
703    operator_count: u32,
704    threshold: ThresholdConfig,
705}
706
707#[cfg(feature = "aggregation")]
708async fn prepare_aggregation_task(
709    cache: &crate::cache::SharedServiceConfigCache,
710    client: &TangleClient,
711    service_id: u64,
712    job_index: u8,
713    config: &AggregationConfig,
714) -> Result<AggregationTaskInit, AggregatingConsumerError> {
715    let operators = cache
716        .get_service_operators(client, service_id)
717        .await
718        .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
719
720    if operators.is_empty() {
721        return Err(AggregatingConsumerError::Client(format!(
722            "No operators registered for service {service_id}"
723        )));
724    }
725
726    let operator_count = operators.len() as u32;
727
728    let threshold = match config.threshold_type {
729        ThresholdType::CountBased => {
730            let required = integration::calculate_required_signers(
731                operators.len(),
732                config.threshold_bps,
733                ThresholdType::CountBased,
734                None,
735            );
736            ThresholdConfig::Count {
737                required_signers: required as u32,
738            }
739        }
740        ThresholdType::StakeWeighted => {
741            let weights = cache
742                .get_operator_weights(client, service_id)
743                .await
744                .map_err(|e| AggregatingConsumerError::Client(e.to_string()))?;
745
746            if weights.is_empty() {
747                blueprint_core::warn!(
748                    target: "tangle-aggregating-consumer",
749                    service_id,
750                    job_index,
751                    "No operator weights found for service {}; falling back to count-based threshold",
752                    service_id
753                );
754                let required = integration::calculate_required_signers(
755                    operators.len(),
756                    config.threshold_bps,
757                    ThresholdType::CountBased,
758                    None,
759                );
760                ThresholdConfig::Count {
761                    required_signers: required as u32,
762                }
763            } else {
764                let mut stakes = Vec::with_capacity(operators.len());
765                let mut numeric_stakes = Vec::with_capacity(operators.len());
766                for (idx, operator) in operators.iter().enumerate() {
767                    let weight = u64::from(*weights.weights.get(operator).unwrap_or(&0));
768                    stakes.push(OperatorStake {
769                        operator_index: idx as u32,
770                        stake: weight,
771                    });
772                    numeric_stakes.push(weight);
773                }
774
775                if numeric_stakes.iter().all(|stake| *stake == 0) {
776                    blueprint_core::warn!(
777                        target: "tangle-aggregating-consumer",
778                        service_id,
779                        job_index,
780                        "Operator weights for service {} are zero; falling back to count-based threshold",
781                        service_id
782                    );
783                    let required = integration::calculate_required_signers(
784                        operators.len(),
785                        config.threshold_bps,
786                        ThresholdType::CountBased,
787                        None,
788                    );
789                    ThresholdConfig::Count {
790                        required_signers: required as u32,
791                    }
792                } else {
793                    blueprint_core::trace!(
794                        target: "tangle-aggregating-consumer",
795                        service_id,
796                        job_index,
797                        stakes = ?numeric_stakes,
798                        "Prepared stake-weighted threshold"
799                    );
800
801                    ThresholdConfig::StakeWeighted {
802                        threshold_bps: u32::from(config.threshold_bps),
803                        operator_stakes: stakes,
804                    }
805                }
806            }
807        }
808    };
809
810    Ok(AggregationTaskInit {
811        operator_count,
812        threshold,
813    })
814}
815
816/// Submit a job result without aggregation feature
817#[cfg(not(feature = "aggregation"))]
818async fn submit_job_result(
819    cache: crate::cache::SharedServiceConfigCache,
820    client: Arc<TangleClient>,
821    service_id: u64,
822    call_id: u64,
823    job_index: u8,
824    output: Bytes,
825) -> Result<(), AggregatingConsumerError> {
826    // Check if aggregation is required (uses cache)
827    let config =
828        AggregatingConsumer::get_aggregation_config(&cache, &client, service_id, job_index).await?;
829
830    if config.required {
831        blueprint_core::warn!(
832            target: "tangle-aggregating-consumer",
833            "Job {} for service {} requires aggregation but 'aggregation' feature not enabled. \
834             Enable the feature and configure the aggregation service.",
835            call_id,
836            service_id,
837        );
838        Ok(())
839    } else {
840        submit_direct_result(client, service_id, call_id, output).await
841    }
842}
843
844/// Submit using the aggregation service(s)
845///
846/// This function:
847/// 1. Signs the output with the operator's BLS key
848/// 2. Sends the signature to ALL configured aggregation services (for redundancy)
849/// 3. Waits for threshold if configured
850/// 4. Submits the aggregated result to chain if enabled (all operators can race to submit)
851#[cfg(feature = "aggregation")]
852async fn submit_aggregated_result(
853    cache: crate::cache::SharedServiceConfigCache,
854    client: Arc<TangleClient>,
855    service_id: u64,
856    call_id: u64,
857    job_index: u8,
858    output: Bytes,
859    config: AggregationConfig,
860    agg: AggregationServiceConfig,
861) -> Result<(), AggregatingConsumerError> {
862    use blueprint_crypto_bn254::ArkBlsBn254;
863    use blueprint_crypto_core::{BytesEncoding, KeyType};
864    use blueprint_tangle_aggregation_svc::{SubmitSignatureRequest, create_signing_message};
865
866    let task_init =
867        prepare_aggregation_task(&cache, &client, service_id, job_index, &config).await?;
868
869    blueprint_core::debug!(
870        target: "tangle-aggregating-consumer",
871        service_id,
872        call_id,
873        job_index,
874        operator_count = task_init.operator_count,
875        threshold = ?task_init.threshold,
876        "Prepared aggregation task initialization payload"
877    );
878
879    blueprint_core::debug!(
880        target: "tangle-aggregating-consumer",
881        "Submitting signature to {} aggregation service(s) for service {} call {}",
882        agg.clients.len(),
883        service_id,
884        call_id
885    );
886
887    // Create the message to sign
888    let message = create_signing_message(service_id, call_id, &output);
889
890    // Sign with BLS key - we need a mutable clone since sign_with_secret takes &mut
891    let mut secret_clone = (*agg.bls_secret).clone();
892    let signature = ArkBlsBn254::sign_with_secret(&mut secret_clone, &message)
893        .map_err(|e| AggregatingConsumerError::Bls(e.to_string()))?;
894
895    // Get public key and signature bytes using BytesEncoding trait
896    let pubkey_bytes = agg.bls_public.to_bytes();
897    let sig_bytes = signature.to_bytes();
898
899    // Create the submit request (same for all services)
900    let submit_request = SubmitSignatureRequest {
901        service_id,
902        call_id,
903        operator_index: agg.operator_index,
904        output: output.to_vec(),
905        signature: sig_bytes.clone(),
906        public_key: pubkey_bytes.clone(),
907    };
908
909    // Submit to ALL aggregation services
910    let mut any_success = false;
911    let mut last_response = None;
912
913    for (idx, service_client) in agg.clients.iter().enumerate() {
914        // Try to initialize the task (may already exist from another operator)
915        let _ = service_client
916            .init_task(
917                service_id,
918                call_id,
919                output.as_ref(),
920                task_init.operator_count,
921                task_init.threshold.clone(),
922            )
923            .await;
924
925        // Submit our signature
926        match service_client
927            .submit_signature(submit_request.clone())
928            .await
929        {
930            Ok(response) => {
931                blueprint_core::info!(
932                    target: "tangle-aggregating-consumer",
933                    "Submitted signature to aggregation service {}: {}/{} signatures (threshold met: {})",
934                    idx,
935                    response.signatures_collected,
936                    response.threshold_required,
937                    response.threshold_met
938                );
939                any_success = true;
940                last_response = Some(response);
941            }
942            Err(e) => {
943                blueprint_core::warn!(
944                    target: "tangle-aggregating-consumer",
945                    "Failed to submit to aggregation service {}: {}",
946                    idx,
947                    e
948                );
949            }
950        }
951    }
952
953    if !any_success {
954        return Err(AggregatingConsumerError::Client(
955            "Failed to submit to any aggregation service".to_string(),
956        ));
957    }
958
959    // Check if we should submit to chain
960    if !agg.submit_to_chain {
961        blueprint_core::debug!(
962            target: "tangle-aggregating-consumer",
963            "submit_to_chain is disabled, not submitting to chain"
964        );
965        return Ok(());
966    }
967
968    // Try to submit to chain
969    let response = last_response.unwrap();
970
971    if response.threshold_met {
972        // Threshold already met, try to submit immediately
973        if let Err(e) =
974            try_submit_aggregated_to_chain(client.clone(), &agg, service_id, call_id).await
975        {
976            blueprint_core::debug!(
977                target: "tangle-aggregating-consumer",
978                "Failed to submit aggregated result (likely already submitted): {}",
979                e
980            );
981        }
982    } else if agg.wait_for_threshold {
983        // Wait for threshold to be met, then submit
984        blueprint_core::debug!(
985            target: "tangle-aggregating-consumer",
986            "Waiting for threshold to be met..."
987        );
988
989        // Try to get result from any service
990        let result = wait_for_threshold_any_service(&agg, service_id, call_id).await?;
991
992        // Try to submit to chain (race with other operators)
993        if let Err(e) =
994            submit_aggregated_to_chain_with_result(client, &agg, service_id, call_id, result).await
995        {
996            blueprint_core::debug!(
997                target: "tangle-aggregating-consumer",
998                "Failed to submit aggregated result (likely already submitted by another operator): {}",
999                e
1000            );
1001        }
1002    }
1003
1004    Ok(())
1005}
1006
1007/// Wait for threshold to be met, trying all configured services
1008#[cfg(feature = "aggregation")]
1009async fn wait_for_threshold_any_service(
1010    agg: &AggregationServiceConfig,
1011    service_id: u64,
1012    call_id: u64,
1013) -> Result<blueprint_tangle_aggregation_svc::AggregatedResultResponse, AggregatingConsumerError> {
1014    use blueprint_std::time::Instant;
1015
1016    let start = Instant::now();
1017    let timeout = agg.threshold_timeout;
1018    let poll_interval = agg.poll_interval;
1019
1020    while start.elapsed() < timeout {
1021        // Try each service until one returns a result
1022        for client in &agg.clients {
1023            match client.get_aggregated(service_id, call_id).await {
1024                Ok(Some(result)) => {
1025                    return Ok(result);
1026                }
1027                Ok(None) => {
1028                    // Threshold not yet met on this service
1029                }
1030                Err(e) => {
1031                    blueprint_core::trace!(
1032                        target: "tangle-aggregating-consumer",
1033                        "Error polling aggregation service: {}",
1034                        e
1035                    );
1036                }
1037            }
1038        }
1039
1040        tokio::time::sleep(poll_interval).await;
1041    }
1042
1043    Err(AggregatingConsumerError::Client(
1044        "Timeout waiting for aggregation threshold".to_string(),
1045    ))
1046}
1047
1048/// Try to submit the aggregated result to chain, handling "already submitted" gracefully
1049#[cfg(feature = "aggregation")]
1050async fn try_submit_aggregated_to_chain(
1051    client: Arc<TangleClient>,
1052    agg: &AggregationServiceConfig,
1053    service_id: u64,
1054    call_id: u64,
1055) -> Result<(), AggregatingConsumerError> {
1056    // Try to get result from any service
1057    for service_client in &agg.clients {
1058        if let Ok(Some(result)) = service_client.get_aggregated(service_id, call_id).await {
1059            return submit_aggregated_to_chain_with_result(
1060                client, agg, service_id, call_id, result,
1061            )
1062            .await;
1063        }
1064    }
1065
1066    Err(AggregatingConsumerError::Client(
1067        "Aggregated result not available from any service".to_string(),
1068    ))
1069}
1070
1071/// Submit the aggregated result to the blockchain with a pre-fetched result
1072#[cfg(feature = "aggregation")]
1073async fn submit_aggregated_to_chain_with_result(
1074    client: Arc<TangleClient>,
1075    agg: &AggregationServiceConfig,
1076    service_id: u64,
1077    call_id: u64,
1078    result: blueprint_tangle_aggregation_svc::AggregatedResultResponse,
1079) -> Result<(), AggregatingConsumerError> {
1080    use crate::aggregation::{AggregatedResult, G1Point, G2Point, SignerBitmap};
1081
1082    if client.config.dry_run {
1083        blueprint_core::info!(
1084            target: "tangle-aggregating-consumer",
1085            "Dry run enabled; skipping aggregated result submission for service {} call {}",
1086            service_id,
1087            call_id
1088        );
1089        return Ok(());
1090    }
1091
1092    blueprint_core::info!(
1093        target: "tangle-aggregating-consumer",
1094        "Submitting aggregated result to chain for service {} call {}",
1095        service_id,
1096        call_id
1097    );
1098
1099    // Parse the signature and pubkey from the response
1100    let signature = G1Point::from_bytes(&result.aggregated_signature)
1101        .ok_or_else(|| AggregatingConsumerError::Bls("Invalid aggregated signature".to_string()))?;
1102    let pubkey = G2Point::from_bytes(&result.aggregated_pubkey)
1103        .ok_or_else(|| AggregatingConsumerError::Bls("Invalid aggregated pubkey".to_string()))?;
1104
1105    let aggregated = AggregatedResult::new(
1106        service_id,
1107        call_id,
1108        Bytes::from(result.output),
1109        SignerBitmap(result.signer_bitmap),
1110        signature,
1111        pubkey,
1112    );
1113
1114    // Submit to the contract
1115    aggregated
1116        .submit(&Arc::new(client.as_ref().clone()))
1117        .await?;
1118
1119    // Mark as submitted in all aggregation services
1120    for client in &agg.clients {
1121        let _ = client.mark_submitted(service_id, call_id).await;
1122    }
1123
1124    blueprint_core::info!(
1125        target: "tangle-aggregating-consumer",
1126        "Successfully submitted aggregated result for service {} call {}",
1127        service_id,
1128        call_id
1129    );
1130
1131    Ok(())
1132}
1133
1134/// Submit a result directly without aggregation
1135async fn submit_direct_result(
1136    client: Arc<TangleClient>,
1137    service_id: u64,
1138    call_id: u64,
1139    output: Bytes,
1140) -> Result<(), AggregatingConsumerError> {
1141    blueprint_core::debug!(
1142        target: "tangle-aggregating-consumer",
1143        "Submitting direct result for service {} call {}",
1144        service_id,
1145        call_id
1146    );
1147
1148    if client.config.dry_run {
1149        blueprint_core::info!(
1150            target: "tangle-aggregating-consumer",
1151            "Dry run enabled; skipping direct result submission for service {} call {}",
1152            service_id,
1153            call_id
1154        );
1155        return Ok(());
1156    }
1157
1158    let result = client
1159        .submit_result(service_id, call_id, output)
1160        .await
1161        .map_err(|e| {
1162            AggregatingConsumerError::Transaction(format!("Failed to submit result: {e}"))
1163        })?;
1164
1165    if result.success {
1166        blueprint_core::info!(
1167            target: "tangle-aggregating-consumer",
1168            "Successfully submitted direct result for service {} call {}: tx_hash={:?}",
1169            service_id,
1170            call_id,
1171            result.tx_hash
1172        );
1173    } else {
1174        return Err(AggregatingConsumerError::Transaction(format!(
1175            "Transaction reverted for service {} call {}: tx_hash={:?}",
1176            service_id, call_id, result.tx_hash
1177        )));
1178    }
1179
1180    Ok(())
1181}
1182
1183/// Helper to integrate with the P2P aggregation protocol
1184///
1185/// This would be used by blueprint developers who want full control
1186/// over the aggregation process.
1187pub mod integration {
1188    use super::*;
1189
1190    /// Create the message hash that needs to be signed for BLS aggregation
1191    ///
1192    /// This matches the contract's verification: keccak256(abi.encodePacked(serviceId, callId, keccak256(output)))
1193    pub fn create_signing_message(service_id: u64, call_id: u64, output: &[u8]) -> Vec<u8> {
1194        use alloy_primitives::keccak256;
1195
1196        let output_hash = keccak256(output);
1197        let mut message = Vec::with_capacity(8 + 8 + 32);
1198        message.extend_from_slice(&service_id.to_be_bytes());
1199        message.extend_from_slice(&call_id.to_be_bytes());
1200        message.extend_from_slice(output_hash.as_slice());
1201        message
1202    }
1203
1204    /// Calculate required signer count based on threshold config
1205    pub fn calculate_required_signers(
1206        total_operators: usize,
1207        threshold_bps: u16,
1208        threshold_type: ThresholdType,
1209        operator_stakes: Option<&[u64]>,
1210    ) -> usize {
1211        fn count_based(total: usize, threshold_bps: u16) -> usize {
1212            if total == 0 {
1213                return 1;
1214            }
1215            let mut required = (total as u64 * threshold_bps as u64) / 10000;
1216            if required == 0 {
1217                required = 1;
1218            }
1219            let required = required as usize;
1220            required.min(total).max(1)
1221        }
1222
1223        match threshold_type {
1224            ThresholdType::CountBased => count_based(total_operators, threshold_bps),
1225            ThresholdType::StakeWeighted => {
1226                if let Some(stakes) = operator_stakes {
1227                    if stakes.is_empty() || stakes.iter().all(|stake| *stake == 0) {
1228                        return count_based(total_operators, threshold_bps);
1229                    }
1230
1231                    let total_stake: u128 = stakes.iter().map(|s| *s as u128).sum();
1232                    if total_stake == 0 {
1233                        return count_based(total_operators, threshold_bps);
1234                    }
1235
1236                    let mut required_stake = (total_stake * threshold_bps as u128) / 10000u128;
1237                    if required_stake == 0 {
1238                        required_stake = 1;
1239                    }
1240
1241                    let mut sorted: Vec<u64> = stakes.to_vec();
1242                    sorted.sort_by(|a, b| b.cmp(a));
1243
1244                    let mut accumulated: u128 = 0;
1245                    let mut required_signers = 0usize;
1246
1247                    for stake in sorted {
1248                        required_signers += 1;
1249                        accumulated += stake as u128;
1250                        if accumulated >= required_stake {
1251                            break;
1252                        }
1253                    }
1254
1255                    required_signers.min(total_operators.max(1)).max(1)
1256                } else {
1257                    count_based(total_operators, threshold_bps)
1258                }
1259            }
1260        }
1261    }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266    use super::integration::*;
1267    use blueprint_client_tangle::ThresholdType;
1268
1269    // ═══════════════════════════════════════════════════════════════════════════
1270    // create_signing_message tests
1271    // ═══════════════════════════════════════════════════════════════════════════
1272
1273    #[test]
1274    fn test_create_signing_message_format() {
1275        let service_id = 1u64;
1276        let call_id = 42u64;
1277        let output = b"test output";
1278
1279        let message = create_signing_message(service_id, call_id, output);
1280
1281        // Should be 8 + 8 + 32 = 48 bytes
1282        assert_eq!(message.len(), 48);
1283
1284        // First 8 bytes should be service_id in big endian
1285        assert_eq!(&message[0..8], &service_id.to_be_bytes());
1286
1287        // Next 8 bytes should be call_id in big endian
1288        assert_eq!(&message[8..16], &call_id.to_be_bytes());
1289
1290        // Last 32 bytes should be keccak256(output)
1291        use alloy_primitives::keccak256;
1292        let expected_hash = keccak256(output);
1293        assert_eq!(&message[16..48], expected_hash.as_slice());
1294    }
1295
1296    #[test]
1297    fn test_create_signing_message_deterministic() {
1298        let msg1 = create_signing_message(1, 2, b"hello");
1299        let msg2 = create_signing_message(1, 2, b"hello");
1300        assert_eq!(msg1, msg2);
1301    }
1302
1303    #[test]
1304    fn test_create_signing_message_different_outputs() {
1305        let msg1 = create_signing_message(1, 2, b"hello");
1306        let msg2 = create_signing_message(1, 2, b"world");
1307        // Different outputs should produce different messages (different hash suffix)
1308        assert_ne!(msg1, msg2);
1309        // But same prefix (service_id and call_id)
1310        assert_eq!(&msg1[0..16], &msg2[0..16]);
1311    }
1312
1313    #[test]
1314    fn test_create_signing_message_empty_output() {
1315        let msg = create_signing_message(1, 2, b"");
1316        assert_eq!(msg.len(), 48);
1317    }
1318
1319    // ═══════════════════════════════════════════════════════════════════════════
1320    // calculate_required_signers tests - Count Based
1321    // ═══════════════════════════════════════════════════════════════════════════
1322
1323    #[test]
1324    fn test_calculate_required_signers_count_based_67_percent() {
1325        // 67% of 3 operators = 2.01 -> 2
1326        let required = calculate_required_signers(3, 6700, ThresholdType::CountBased, None);
1327        assert_eq!(required, 2);
1328    }
1329
1330    #[test]
1331    fn test_calculate_required_signers_count_based_50_percent() {
1332        // 50% of 4 operators = 2
1333        let required = calculate_required_signers(4, 5000, ThresholdType::CountBased, None);
1334        assert_eq!(required, 2);
1335    }
1336
1337    #[test]
1338    fn test_calculate_required_signers_count_based_100_percent() {
1339        // 100% of 5 operators = 5
1340        let required = calculate_required_signers(5, 10000, ThresholdType::CountBased, None);
1341        assert_eq!(required, 5);
1342    }
1343
1344    #[test]
1345    fn test_calculate_required_signers_count_based_minimum_one() {
1346        // Very low threshold should still require at least 1
1347        let required = calculate_required_signers(10, 100, ThresholdType::CountBased, None); // 1%
1348        assert_eq!(required, 1);
1349    }
1350
1351    #[test]
1352    fn test_calculate_required_signers_count_based_single_operator() {
1353        // Single operator, any threshold should require 1
1354        let required = calculate_required_signers(1, 6700, ThresholdType::CountBased, None);
1355        assert_eq!(required, 1);
1356    }
1357
1358    #[test]
1359    fn test_calculate_required_signers_count_based_large_set() {
1360        // 67% of 100 operators = 67
1361        let required = calculate_required_signers(100, 6700, ThresholdType::CountBased, None);
1362        assert_eq!(required, 67);
1363    }
1364
1365    // ═══════════════════════════════════════════════════════════════════════════
1366    // calculate_required_signers tests - Stake Weighted
1367    // ═══════════════════════════════════════════════════════════════════════════
1368
1369    #[test]
1370    fn test_calculate_required_signers_stake_weighted_no_stakes() {
1371        // Without stakes, should fall back to count-based
1372        let required = calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, None);
1373        assert_eq!(required, 2);
1374    }
1375
1376    #[test]
1377    fn test_calculate_required_signers_stake_weighted_equal_stakes() {
1378        // 3 operators with equal stakes (10 each), 67% threshold
1379        // Total stake = 30, required = 20.1, avg = 10, required signers = 2
1380        let stakes = [10u64, 10, 10];
1381        let required =
1382            calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, Some(&stakes));
1383        assert_eq!(required, 2);
1384    }
1385
1386    #[test]
1387    fn test_calculate_required_signers_stake_weighted_unequal_stakes() {
1388        // 3 operators: 5, 3, 2 ETH stake (like in contract tests)
1389        // Total = 10 ETH, 67% = 6.7 ETH required
1390        // Avg stake = 3.33, signers needed ≈ 2
1391        let stakes = [5u64, 3, 2];
1392        let required =
1393            calculate_required_signers(3, 6700, ThresholdType::StakeWeighted, Some(&stakes));
1394        assert_eq!(required, 2);
1395    }
1396
1397    #[test]
1398    fn test_calculate_required_signers_stake_weighted_minimum_one() {
1399        // Very low threshold should still require at least 1
1400        let stakes = [100u64, 100, 100];
1401        let required = calculate_required_signers(
1402            3,
1403            100, // 1%
1404            ThresholdType::StakeWeighted,
1405            Some(&stakes),
1406        );
1407        assert_eq!(required, 1);
1408    }
1409
1410    // ═══════════════════════════════════════════════════════════════════════════
1411    // URL conversion tests (for discover_operator_services logic)
1412    // ═══════════════════════════════════════════════════════════════════════════
1413
1414    /// Helper function to convert RPC address to aggregation URL
1415    /// This mirrors the logic in discover_operator_services
1416    fn convert_rpc_to_aggregation_url(rpc: &str, aggregation_path: &str) -> Option<String> {
1417        if rpc.is_empty() {
1418            return None;
1419        }
1420
1421        if aggregation_path.starts_with(':') {
1422            // Replace the port in the URL
1423            if let Some(host_end) = rpc.rfind(':') {
1424                let before_port = &rpc[..host_end];
1425                if before_port.contains("://") {
1426                    return Some(format!("{}{}", before_port, aggregation_path));
1427                }
1428            }
1429            // No port found, just append
1430            Some(format!("{}{}", rpc, aggregation_path))
1431        } else {
1432            // Append as a path
1433            let base = rpc.trim_end_matches('/');
1434            Some(format!("{}{}", base, aggregation_path))
1435        }
1436    }
1437
1438    #[test]
1439    fn test_url_conversion_port_replacement() {
1440        // Test replacing port
1441        let url = convert_rpc_to_aggregation_url("http://localhost:8545", ":9090");
1442        assert_eq!(url, Some("http://localhost:9090".to_string()));
1443
1444        let url = convert_rpc_to_aggregation_url("https://operator.example.com:8545", ":9090");
1445        assert_eq!(url, Some("https://operator.example.com:9090".to_string()));
1446    }
1447
1448    #[test]
1449    fn test_url_conversion_port_append() {
1450        // Test appending port when none exists
1451        let url = convert_rpc_to_aggregation_url("http://localhost", ":9090");
1452        assert_eq!(url, Some("http://localhost:9090".to_string()));
1453    }
1454
1455    #[test]
1456    fn test_url_conversion_path_append() {
1457        // Test appending path
1458        let url = convert_rpc_to_aggregation_url("http://localhost:8545", "/aggregation");
1459        assert_eq!(url, Some("http://localhost:8545/aggregation".to_string()));
1460
1461        let url = convert_rpc_to_aggregation_url("http://localhost:8545/", "/aggregation");
1462        assert_eq!(url, Some("http://localhost:8545/aggregation".to_string()));
1463    }
1464
1465    #[test]
1466    fn test_url_conversion_empty() {
1467        let url = convert_rpc_to_aggregation_url("", ":9090");
1468        assert_eq!(url, None);
1469    }
1470
1471    #[test]
1472    fn test_url_conversion_complex_urls() {
1473        // IPv6 address
1474        let url = convert_rpc_to_aggregation_url("http://[::1]:8545", ":9090");
1475        assert_eq!(url, Some("http://[::1]:9090".to_string()));
1476
1477        // URL with path
1478        let url = convert_rpc_to_aggregation_url("http://localhost:8545/rpc", "/v1/aggregate");
1479        assert_eq!(
1480            url,
1481            Some("http://localhost:8545/rpc/v1/aggregate".to_string())
1482        );
1483    }
1484
1485    // ═══════════════════════════════════════════════════════════════════════════
1486    // AggregationServiceConfig multi-service tests
1487    // ═══════════════════════════════════════════════════════════════════════════
1488
1489    #[cfg(feature = "aggregation")]
1490    mod aggregation_config_tests {
1491        use crate::AggregationServiceConfig;
1492        use blueprint_crypto_bn254::ArkBlsBn254;
1493        use blueprint_crypto_core::KeyType;
1494        use std::time::Duration;
1495
1496        fn test_bls_secret() -> blueprint_crypto_bn254::ArkBlsBn254Secret {
1497            // Generate a deterministic test key
1498            let seed = [1u8; 32];
1499            ArkBlsBn254::generate_with_seed(Some(&seed)).unwrap()
1500        }
1501
1502        #[test]
1503        fn test_config_single_service() {
1504            let config =
1505                AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1506            assert_eq!(config.clients.len(), 1);
1507            assert_eq!(config.operator_index, 0);
1508            assert!(config.submit_to_chain);
1509        }
1510
1511        #[test]
1512        fn test_config_multiple_services() {
1513            let config = AggregationServiceConfig::with_multiple_services(
1514                vec![
1515                    "http://service1:8080",
1516                    "http://service2:8080",
1517                    "http://service3:8080",
1518                ],
1519                test_bls_secret(),
1520                1,
1521            );
1522            assert_eq!(config.clients.len(), 3);
1523            assert_eq!(config.operator_index, 1);
1524        }
1525
1526        #[test]
1527        fn test_config_add_service() {
1528            let config =
1529                AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0)
1530                    .add_service("http://backup:8080")
1531                    .add_service("http://fallback:8080");
1532
1533            assert_eq!(config.clients.len(), 3);
1534        }
1535
1536        #[test]
1537        fn test_config_with_submit_to_chain() {
1538            let config =
1539                AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1540            // Default is true
1541            assert!(config.submit_to_chain);
1542
1543            let config = config.with_submit_to_chain(false);
1544            assert!(!config.submit_to_chain);
1545        }
1546
1547        #[test]
1548        fn test_config_with_wait_for_threshold() {
1549            let config =
1550                AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0);
1551            // Default is false
1552            assert!(!config.wait_for_threshold);
1553
1554            let config = config.with_wait_for_threshold(true);
1555            assert!(config.wait_for_threshold);
1556        }
1557
1558        #[test]
1559        fn test_config_with_threshold_timeout() {
1560            let config =
1561                AggregationServiceConfig::new("http://localhost:8080", test_bls_secret(), 0)
1562                    .with_threshold_timeout(Duration::from_secs(120));
1563
1564            assert_eq!(config.threshold_timeout, Duration::from_secs(120));
1565        }
1566
1567        #[test]
1568        fn test_config_client_accessor() {
1569            let config = AggregationServiceConfig::with_multiple_services(
1570                vec!["http://service1:8080", "http://service2:8080"],
1571                test_bls_secret(),
1572                0,
1573            );
1574            // client() should return the first one
1575            let _client = config.client();
1576            // Just verify it doesn't panic
1577        }
1578
1579        #[test]
1580        fn test_config_empty_services() {
1581            // with_multiple_services should handle empty iterator
1582            let config = AggregationServiceConfig::with_multiple_services(
1583                Vec::<String>::new(),
1584                test_bls_secret(),
1585                0,
1586            );
1587            assert_eq!(config.clients.len(), 0);
1588        }
1589    }
1590}