Skip to main content

blueprint_tangle_extra/
cache.rs

1//! Service Configuration Cache
2//!
3//! Provides TTL-based caching for on-chain configuration data to reduce RPC calls.
4//! Caches aggregation configs, operator weights, and service operator lists.
5//!
6//! ## Usage
7//!
8//! ```rust,ignore
9//! use blueprint_tangle_extra::cache::ServiceConfigCache;
10//! use blueprint_std::time::Duration;
11//!
12//! // Create cache with 5 minute TTL
13//! let cache = ServiceConfigCache::new(Duration::from_secs(300));
14//!
15//! // Get aggregation config (fetches from chain if not cached or expired)
16//! let config = cache.get_aggregation_config(&client, service_id, job_index).await?;
17//!
18//! // Get operator weights for a service
19//! let weights = cache.get_operator_weights(&client, service_id).await?;
20//!
21//! // Force refresh a specific service's data
22//! cache.invalidate_service(service_id);
23//! ```
24
25use alloy_primitives::Address;
26use blueprint_client_tangle::{AggregationConfig, OperatorMetadata, TangleClient};
27use blueprint_std::collections::HashMap;
28use blueprint_std::format;
29use blueprint_std::string::{String, ToString};
30use blueprint_std::sync::{Arc, RwLock};
31use blueprint_std::time::{Duration, Instant};
32use blueprint_std::vec::Vec;
33use core::fmt;
34use core::sync::atomic::{AtomicU64, Ordering};
35
36/// Default cache TTL (5 minutes)
37pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(300);
38
39/// Error type for cache operations
40#[derive(Debug, thiserror::Error)]
41pub enum CacheError {
42    /// Failed to fetch from chain
43    #[error("Failed to fetch from chain: {0}")]
44    FetchError(String),
45    /// Lock poisoned
46    #[error("Cache lock poisoned")]
47    LockPoisoned,
48}
49
50/// A cached entry with timestamp
51#[derive(Clone, Debug)]
52struct CacheEntry<T> {
53    value: T,
54    cached_at: Instant,
55}
56
57impl<T> CacheEntry<T> {
58    fn new(value: T) -> Self {
59        Self {
60            value,
61            cached_at: Instant::now(),
62        }
63    }
64
65    fn is_expired(&self, ttl: Duration) -> bool {
66        self.cached_at.elapsed() > ttl
67    }
68}
69
70/// Operator weight information for a service
71#[derive(Clone, Debug)]
72pub struct OperatorWeights {
73    /// Map of operator address to their exposure in basis points
74    pub weights: HashMap<Address, u16>,
75    /// Total exposure across all operators
76    pub total_exposure: u64,
77}
78
79impl OperatorWeights {
80    /// Get weight for a specific operator
81    pub fn get(&self, operator: &Address) -> Option<u16> {
82        self.weights.get(operator).copied()
83    }
84
85    /// Check if an operator is active in this service
86    pub fn contains(&self, operator: &Address) -> bool {
87        self.weights.contains_key(operator)
88    }
89
90    /// Get the number of active operators
91    pub fn len(&self) -> usize {
92        self.weights.len()
93    }
94
95    /// Check if there are no operators
96    pub fn is_empty(&self) -> bool {
97        self.weights.is_empty()
98    }
99
100    /// Iterate over all operators and their weights
101    pub fn iter(&self) -> impl Iterator<Item = (&Address, &u16)> {
102        self.weights.iter()
103    }
104
105    /// Calculate the stake-weighted threshold count
106    ///
107    /// Given a threshold in basis points, calculates how many operators
108    /// (sorted by weight descending) are needed to meet the threshold.
109    pub fn calculate_threshold_signers(&self, threshold_bps: u16) -> usize {
110        if self.weights.is_empty() {
111            return 0;
112        }
113
114        let required_weight = (self.total_exposure * threshold_bps as u64) / 10000;
115
116        // Sort operators by weight descending
117        let mut sorted: Vec<_> = self.weights.iter().collect();
118        sorted.sort_by(|a, b| b.1.cmp(a.1));
119
120        let mut accumulated: u64 = 0;
121        let mut count = 0;
122
123        for (_, &weight) in sorted {
124            accumulated += weight as u64;
125            count += 1;
126            if accumulated >= required_weight {
127                break;
128            }
129        }
130
131        count
132    }
133}
134
135/// Service operators list
136#[derive(Clone, Debug)]
137pub struct ServiceOperators {
138    /// List of operator addresses
139    pub operators: Vec<Address>,
140    /// Map of operator address to index (for bitmap calculation)
141    pub index_map: HashMap<Address, usize>,
142}
143
144impl ServiceOperators {
145    /// Create from a list of operators
146    pub fn new(operators: Vec<Address>) -> Self {
147        let index_map = operators
148            .iter()
149            .enumerate()
150            .map(|(i, addr)| (*addr, i))
151            .collect();
152        Self {
153            operators,
154            index_map,
155        }
156    }
157
158    /// Get the index of an operator
159    pub fn index_of(&self, operator: &Address) -> Option<usize> {
160        self.index_map.get(operator).copied()
161    }
162
163    /// Get the number of operators
164    pub fn len(&self) -> usize {
165        self.operators.len()
166    }
167
168    /// Check if empty
169    pub fn is_empty(&self) -> bool {
170        self.operators.is_empty()
171    }
172
173    /// Iterate over operators
174    pub fn iter(&self) -> impl Iterator<Item = &Address> {
175        self.operators.iter()
176    }
177}
178
179/// Thread-safe cache for service configurations
180///
181/// Caches aggregation configs, operator weights, and operator lists
182/// with TTL-based expiration.
183pub struct ServiceConfigCache {
184    /// TTL for cache entries
185    ttl: Duration,
186    /// Aggregation config cache: (service_id, job_index) -> config
187    aggregation_configs: RwLock<HashMap<(u64, u8), CacheEntry<AggregationConfig>>>,
188    /// Operator weights cache: service_id -> weights
189    operator_weights: RwLock<HashMap<u64, CacheEntry<OperatorWeights>>>,
190    /// Service operators cache: service_id -> operators
191    service_operators: RwLock<HashMap<u64, CacheEntry<ServiceOperators>>>,
192    /// Operator metadata cache: (blueprint_id, operator) -> metadata
193    operator_metadata: RwLock<HashMap<(u64, Address), CacheEntry<OperatorMetadata>>>,
194}
195
196impl ServiceConfigCache {
197    /// Create a new cache with the specified TTL
198    pub fn new(ttl: Duration) -> Self {
199        Self {
200            ttl,
201            aggregation_configs: RwLock::new(HashMap::new()),
202            operator_weights: RwLock::new(HashMap::new()),
203            service_operators: RwLock::new(HashMap::new()),
204            operator_metadata: RwLock::new(HashMap::new()),
205        }
206    }
207
208    /// Create a new cache with default TTL (5 minutes)
209    pub fn with_default_ttl() -> Self {
210        Self::new(DEFAULT_CACHE_TTL)
211    }
212
213    /// Get the current TTL
214    pub fn ttl(&self) -> Duration {
215        self.ttl
216    }
217
218    /// Set a new TTL (does not affect already cached entries)
219    pub fn set_ttl(&mut self, ttl: Duration) {
220        self.ttl = ttl;
221    }
222
223    // ═══════════════════════════════════════════════════════════════════════════
224    // AGGREGATION CONFIG
225    // ═══════════════════════════════════════════════════════════════════════════
226
227    /// Get aggregation config, using cache if available and not expired
228    pub async fn get_aggregation_config(
229        &self,
230        client: &TangleClient,
231        service_id: u64,
232        job_index: u8,
233    ) -> Result<AggregationConfig, CacheError> {
234        let key = (service_id, job_index);
235
236        // Check cache first
237        {
238            let cache = self
239                .aggregation_configs
240                .read()
241                .map_err(|_| CacheError::LockPoisoned)?;
242            if let Some(entry) = cache.get(&key) {
243                if !entry.is_expired(self.ttl) {
244                    blueprint_core::trace!(
245                        target: "service-config-cache",
246                        "Cache hit for aggregation config: service={}, job={}",
247                        service_id,
248                        job_index
249                    );
250                    return Ok(entry.value.clone());
251                }
252            }
253        }
254
255        // Cache miss or expired, fetch from chain
256        blueprint_core::debug!(
257            target: "service-config-cache",
258            "Cache miss for aggregation config: service={}, job={}, fetching from chain",
259            service_id,
260            job_index
261        );
262
263        let config = client
264            .get_aggregation_config(service_id, job_index)
265            .await
266            .map_err(|e| CacheError::FetchError(e.to_string()))?;
267
268        // Store in cache
269        {
270            let mut cache = self
271                .aggregation_configs
272                .write()
273                .map_err(|_| CacheError::LockPoisoned)?;
274            cache.insert(key, CacheEntry::new(config.clone()));
275        }
276
277        Ok(config)
278    }
279
280    /// Pre-populate aggregation config cache
281    pub fn set_aggregation_config(
282        &self,
283        service_id: u64,
284        job_index: u8,
285        config: AggregationConfig,
286    ) -> Result<(), CacheError> {
287        let mut cache = self
288            .aggregation_configs
289            .write()
290            .map_err(|_| CacheError::LockPoisoned)?;
291        cache.insert((service_id, job_index), CacheEntry::new(config));
292        Ok(())
293    }
294
295    // ═══════════════════════════════════════════════════════════════════════════
296    // OPERATOR WEIGHTS
297    // ═══════════════════════════════════════════════════════════════════════════
298
299    /// Get operator weights for a service, using cache if available
300    pub async fn get_operator_weights(
301        &self,
302        client: &TangleClient,
303        service_id: u64,
304    ) -> Result<OperatorWeights, CacheError> {
305        // Check cache first
306        {
307            let cache = self
308                .operator_weights
309                .read()
310                .map_err(|_| CacheError::LockPoisoned)?;
311            if let Some(entry) = cache.get(&service_id) {
312                if !entry.is_expired(self.ttl) {
313                    blueprint_core::trace!(
314                        target: "service-config-cache",
315                        "Cache hit for operator weights: service={}",
316                        service_id
317                    );
318                    return Ok(entry.value.clone());
319                }
320            }
321        }
322
323        // Cache miss or expired, fetch from chain
324        blueprint_core::debug!(
325            target: "service-config-cache",
326            "Cache miss for operator weights: service={}, fetching from chain",
327            service_id
328        );
329
330        let weights = self.fetch_operator_weights(client, service_id).await?;
331
332        // Store in cache
333        {
334            let mut cache = self
335                .operator_weights
336                .write()
337                .map_err(|_| CacheError::LockPoisoned)?;
338            cache.insert(service_id, CacheEntry::new(weights.clone()));
339        }
340
341        Ok(weights)
342    }
343
344    /// Fetch operator weights from chain
345    async fn fetch_operator_weights(
346        &self,
347        client: &TangleClient,
348        service_id: u64,
349    ) -> Result<OperatorWeights, CacheError> {
350        // Get list of operators
351        let operators = client
352            .get_service_operators(service_id)
353            .await
354            .map_err(|e| CacheError::FetchError(format!("Failed to get operators: {}", e)))?;
355
356        // Fetch each operator's weight
357        let mut weights = HashMap::new();
358        let mut total_exposure: u64 = 0;
359
360        for operator in operators {
361            match client.get_service_operator(service_id, operator).await {
362                Ok(op_info) => {
363                    if op_info.active {
364                        weights.insert(operator, op_info.exposureBps);
365                        total_exposure += op_info.exposureBps as u64;
366                    }
367                }
368                Err(e) => {
369                    blueprint_core::warn!(
370                        target: "service-config-cache",
371                        "Failed to get operator info for {}: {}",
372                        operator,
373                        e
374                    );
375                }
376            }
377        }
378
379        Ok(OperatorWeights {
380            weights,
381            total_exposure,
382        })
383    }
384
385    /// Pre-populate operator weights cache
386    pub fn set_operator_weights(
387        &self,
388        service_id: u64,
389        weights: OperatorWeights,
390    ) -> Result<(), CacheError> {
391        let mut cache = self
392            .operator_weights
393            .write()
394            .map_err(|_| CacheError::LockPoisoned)?;
395        cache.insert(service_id, CacheEntry::new(weights));
396        Ok(())
397    }
398
399    // ═══════════════════════════════════════════════════════════════════════════
400    // SERVICE OPERATORS
401    // ═══════════════════════════════════════════════════════════════════════════
402
403    /// Get service operators list, using cache if available
404    pub async fn get_service_operators(
405        &self,
406        client: &TangleClient,
407        service_id: u64,
408    ) -> Result<ServiceOperators, CacheError> {
409        // Check cache first
410        {
411            let cache = self
412                .service_operators
413                .read()
414                .map_err(|_| CacheError::LockPoisoned)?;
415            if let Some(entry) = cache.get(&service_id) {
416                if !entry.is_expired(self.ttl) {
417                    blueprint_core::trace!(
418                        target: "service-config-cache",
419                        "Cache hit for service operators: service={}",
420                        service_id
421                    );
422                    return Ok(entry.value.clone());
423                }
424            }
425        }
426
427        // Cache miss or expired, fetch from chain
428        blueprint_core::debug!(
429            target: "service-config-cache",
430            "Cache miss for service operators: service={}, fetching from chain",
431            service_id
432        );
433
434        let operators_list = client
435            .get_service_operators(service_id)
436            .await
437            .map_err(|e| CacheError::FetchError(e.to_string()))?;
438
439        let operators = ServiceOperators::new(operators_list);
440
441        // Store in cache
442        {
443            let mut cache = self
444                .service_operators
445                .write()
446                .map_err(|_| CacheError::LockPoisoned)?;
447            cache.insert(service_id, CacheEntry::new(operators.clone()));
448        }
449
450        Ok(operators)
451    }
452
453    /// Get metadata for a specific operator (cached by blueprint + operator)
454    pub async fn get_operator_metadata(
455        &self,
456        client: &TangleClient,
457        blueprint_id: u64,
458        operator: Address,
459    ) -> Result<OperatorMetadata, CacheError> {
460        let key = (blueprint_id, operator);
461        if let Some(entry) = self
462            .operator_metadata
463            .read()
464            .map_err(|_| CacheError::LockPoisoned)?
465            .get(&key)
466            .cloned()
467        {
468            if !entry.is_expired(self.ttl) {
469                return Ok(entry.value);
470            }
471        }
472
473        let metadata = client
474            .get_operator_metadata(blueprint_id, operator)
475            .await
476            .map_err(|e| CacheError::FetchError(e.to_string()))?;
477        let mut guard = self
478            .operator_metadata
479            .write()
480            .map_err(|_| CacheError::LockPoisoned)?;
481        guard.insert(key, CacheEntry::new(metadata.clone()));
482        Ok(metadata)
483    }
484
485    /// Get metadata for all operators in a service.
486    pub async fn get_service_operator_metadata(
487        &self,
488        client: &TangleClient,
489        blueprint_id: u64,
490        service_id: u64,
491    ) -> Result<HashMap<Address, OperatorMetadata>, CacheError> {
492        let operators = self.get_service_operators(client, service_id).await?;
493        let mut result = HashMap::with_capacity(operators.len());
494        for operator in operators.iter() {
495            let metadata = self
496                .get_operator_metadata(client, blueprint_id, *operator)
497                .await?;
498            result.insert(*operator, metadata);
499        }
500        Ok(result)
501    }
502
503    // ═══════════════════════════════════════════════════════════════════════════
504    // CACHE MANAGEMENT
505    // ═══════════════════════════════════════════════════════════════════════════
506
507    /// Invalidate all cached data for a specific service
508    pub fn invalidate_service(&self, service_id: u64) {
509        blueprint_core::debug!(
510            target: "service-config-cache",
511            "Invalidating cache for service {}",
512            service_id
513        );
514
515        // Remove aggregation configs for this service
516        if let Ok(mut cache) = self.aggregation_configs.write() {
517            cache.retain(|(sid, _), _| *sid != service_id);
518        }
519
520        // Remove operator weights
521        if let Ok(mut cache) = self.operator_weights.write() {
522            cache.remove(&service_id);
523        }
524
525        // Remove service operators
526        if let Ok(mut cache) = self.service_operators.write() {
527            cache.remove(&service_id);
528        }
529    }
530
531    /// Invalidate a specific aggregation config
532    pub fn invalidate_aggregation_config(&self, service_id: u64, job_index: u8) {
533        if let Ok(mut cache) = self.aggregation_configs.write() {
534            cache.remove(&(service_id, job_index));
535        }
536    }
537
538    /// Clear all cached data
539    pub fn clear(&self) {
540        blueprint_core::debug!(
541            target: "service-config-cache",
542            "Clearing all cached service configs"
543        );
544
545        if let Ok(mut cache) = self.aggregation_configs.write() {
546            cache.clear();
547        }
548        if let Ok(mut cache) = self.operator_weights.write() {
549            cache.clear();
550        }
551        if let Ok(mut cache) = self.service_operators.write() {
552            cache.clear();
553        }
554    }
555
556    /// Remove expired entries from all caches
557    pub fn cleanup_expired(&self) {
558        let ttl = self.ttl;
559
560        if let Ok(mut cache) = self.aggregation_configs.write() {
561            cache.retain(|_, entry| !entry.is_expired(ttl));
562        }
563        if let Ok(mut cache) = self.operator_weights.write() {
564            cache.retain(|_, entry| !entry.is_expired(ttl));
565        }
566        if let Ok(mut cache) = self.service_operators.write() {
567            cache.retain(|_, entry| !entry.is_expired(ttl));
568        }
569    }
570
571    /// Get cache statistics
572    pub fn stats(&self) -> CacheStats {
573        let aggregation_count = self
574            .aggregation_configs
575            .read()
576            .map(|c| c.len())
577            .unwrap_or(0);
578        let weights_count = self.operator_weights.read().map(|c| c.len()).unwrap_or(0);
579        let operators_count = self.service_operators.read().map(|c| c.len()).unwrap_or(0);
580
581        CacheStats {
582            aggregation_configs: aggregation_count,
583            operator_weights: weights_count,
584            service_operators: operators_count,
585            ttl: self.ttl,
586        }
587    }
588}
589
590impl Default for ServiceConfigCache {
591    fn default() -> Self {
592        Self::with_default_ttl()
593    }
594}
595
596/// Cache statistics
597#[derive(Clone, Debug)]
598pub struct CacheStats {
599    /// Number of cached aggregation configs
600    pub aggregation_configs: usize,
601    /// Number of cached operator weights
602    pub operator_weights: usize,
603    /// Number of cached service operator lists
604    pub service_operators: usize,
605    /// Current TTL setting
606    pub ttl: Duration,
607}
608
609impl fmt::Display for CacheStats {
610    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611        write!(
612            f,
613            "ServiceConfigCache {{ aggregation_configs: {}, operator_weights: {}, service_operators: {}, ttl: {:?} }}",
614            self.aggregation_configs, self.operator_weights, self.service_operators, self.ttl
615        )
616    }
617}
618
619/// A shared, thread-safe cache wrapped in Arc
620pub type SharedServiceConfigCache = Arc<ServiceConfigCache>;
621
622/// Create a new shared cache with default TTL
623pub fn shared_cache() -> SharedServiceConfigCache {
624    Arc::new(ServiceConfigCache::with_default_ttl())
625}
626
627/// Create a new shared cache with custom TTL
628pub fn shared_cache_with_ttl(ttl: Duration) -> SharedServiceConfigCache {
629    Arc::new(ServiceConfigCache::new(ttl))
630}
631
632// ═══════════════════════════════════════════════════════════════════════════════
633// EVENT-DRIVEN CACHE SYNC
634// ═══════════════════════════════════════════════════════════════════════════════
635
636/// Events that trigger cache invalidation
637#[derive(Debug, Clone)]
638pub enum CacheInvalidationEvent {
639    /// Operator joined a service - invalidates operator weights and list
640    OperatorJoined { service_id: u64, operator: Address },
641    /// Operator left a service - invalidates operator weights and list
642    OperatorLeft { service_id: u64, operator: Address },
643    /// Service was terminated - clears all service data
644    ServiceTerminated { service_id: u64 },
645    /// Service was activated - optionally pre-warm cache
646    ServiceActivated { service_id: u64 },
647}
648
649impl fmt::Display for CacheInvalidationEvent {
650    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651        match self {
652            Self::OperatorJoined {
653                service_id,
654                operator,
655            } => {
656                write!(
657                    f,
658                    "OperatorJoined(service={}, operator={})",
659                    service_id, operator
660                )
661            }
662            Self::OperatorLeft {
663                service_id,
664                operator,
665            } => {
666                write!(
667                    f,
668                    "OperatorLeft(service={}, operator={})",
669                    service_id, operator
670                )
671            }
672            Self::ServiceTerminated { service_id } => {
673                write!(f, "ServiceTerminated(service={})", service_id)
674            }
675            Self::ServiceActivated { service_id } => {
676                write!(f, "ServiceActivated(service={})", service_id)
677            }
678        }
679    }
680}
681
682impl ServiceConfigCache {
683    /// Handle a cache invalidation event
684    ///
685    /// Call this when you receive relevant on-chain events to keep the cache in sync.
686    /// Logs clearly when invalidation occurs.
687    pub fn handle_event(&self, event: &CacheInvalidationEvent) {
688        blueprint_core::info!(
689            target: "service-config-cache",
690            "⚡ Cache invalidation triggered by event: {}",
691            event
692        );
693
694        match event {
695            CacheInvalidationEvent::OperatorJoined {
696                service_id,
697                operator,
698            } => {
699                blueprint_core::info!(
700                    target: "service-config-cache",
701                    "🔄 Invalidating cache: operator {} joined service {}",
702                    operator,
703                    service_id
704                );
705                self.invalidate_operator_data(*service_id);
706            }
707            CacheInvalidationEvent::OperatorLeft {
708                service_id,
709                operator,
710            } => {
711                blueprint_core::info!(
712                    target: "service-config-cache",
713                    "🔄 Invalidating cache: operator {} left service {}",
714                    operator,
715                    service_id
716                );
717                self.invalidate_operator_data(*service_id);
718            }
719            CacheInvalidationEvent::ServiceTerminated { service_id } => {
720                blueprint_core::info!(
721                    target: "service-config-cache",
722                    "🗑️ Clearing all cache for terminated service {}",
723                    service_id
724                );
725                self.invalidate_service(*service_id);
726            }
727            CacheInvalidationEvent::ServiceActivated { service_id } => {
728                blueprint_core::info!(
729                    target: "service-config-cache",
730                    "✨ Service {} activated (cache will be populated on first access)",
731                    service_id
732                );
733                // No invalidation needed - cache will be populated on first access
734            }
735        }
736    }
737
738    /// Invalidate only operator-related data for a service (weights and operator list)
739    fn invalidate_operator_data(&self, service_id: u64) {
740        if let Ok(mut cache) = self.operator_weights.write() {
741            cache.remove(&service_id);
742        }
743        if let Ok(mut cache) = self.service_operators.write() {
744            cache.remove(&service_id);
745        }
746    }
747}
748
749/// Service that syncs the cache with on-chain events
750///
751/// Provides both polling-based and manual event processing for cache invalidation.
752///
753/// # Example
754///
755/// ```rust,ignore
756/// use blueprint_tangle_extra::cache::{CacheSyncService, shared_cache};
757///
758/// let cache = shared_cache();
759/// let sync_service = CacheSyncService::new(client, cache.clone());
760///
761/// // Option 1: Poll for events periodically
762/// loop {
763///     let events_processed = sync_service.poll_and_sync(last_block).await?;
764///     tokio::time::sleep(Duration::from_secs(12)).await;
765/// }
766///
767/// // Option 2: Process events from your own subscription
768/// sync_service.process_logs(&logs);
769/// ```
770pub struct CacheSyncService {
771    client: Arc<TangleClient>,
772    cache: SharedServiceConfigCache,
773    /// Services to watch (None = watch all)
774    watched_services: Option<Vec<u64>>,
775    /// Last processed block
776    last_block: AtomicU64,
777}
778
779impl CacheSyncService {
780    /// Create a new cache sync service
781    pub fn new(client: Arc<TangleClient>, cache: SharedServiceConfigCache) -> Self {
782        Self {
783            client,
784            cache,
785            watched_services: None,
786            last_block: AtomicU64::new(0),
787        }
788    }
789
790    /// Only watch specific services
791    pub fn with_services(mut self, services: Vec<u64>) -> Self {
792        self.watched_services = Some(services);
793        self
794    }
795
796    /// Set the starting block for polling
797    pub fn from_block(self, block: u64) -> Self {
798        self.last_block.store(block, Ordering::Relaxed);
799        self
800    }
801
802    /// Check if a service should be watched
803    fn should_watch(&self, service_id: u64) -> bool {
804        self.watched_services
805            .as_ref()
806            .map(|s| s.contains(&service_id))
807            .unwrap_or(true)
808    }
809
810    /// Poll for new events and sync the cache
811    ///
812    /// Returns the number of events processed.
813    pub async fn poll_and_sync(&self) -> Result<usize, CacheError> {
814        use alloy_rpc_types::Filter;
815        use blueprint_client_tangle::contracts::ITangle;
816
817        let from_block = self.last_block.load(Ordering::Relaxed);
818        let tangle_address = self.client.config.settings.tangle_contract;
819
820        // Create filter for relevant events
821        let filter = Filter::new()
822            .address(tangle_address)
823            .from_block(from_block)
824            .events([
825                <ITangle::OperatorJoinedService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
826                <ITangle::OperatorLeftService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
827                <ITangle::ServiceTerminated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
828                <ITangle::ServiceActivated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
829            ]);
830
831        let logs = self
832            .client
833            .get_logs(&filter)
834            .await
835            .map_err(|e| CacheError::FetchError(format!("Failed to fetch logs: {}", e)))?;
836
837        let count = self.process_logs(&logs);
838
839        // Update last block
840        if let Some(last_log) = logs.last() {
841            if let Some(block_num) = last_log.block_number {
842                self.last_block.store(block_num + 1, Ordering::Relaxed);
843            }
844        }
845
846        Ok(count)
847    }
848
849    /// Process a batch of logs and invalidate cache as needed
850    ///
851    /// Returns the number of events processed.
852    pub fn process_logs(&self, logs: &[alloy_rpc_types::Log]) -> usize {
853        let mut count = 0;
854        for log in logs {
855            if let Some(event) = self.parse_log(log) {
856                let service_id = match &event {
857                    CacheInvalidationEvent::OperatorJoined { service_id, .. } => *service_id,
858                    CacheInvalidationEvent::OperatorLeft { service_id, .. } => *service_id,
859                    CacheInvalidationEvent::ServiceTerminated { service_id } => *service_id,
860                    CacheInvalidationEvent::ServiceActivated { service_id } => *service_id,
861                };
862                if self.should_watch(service_id) {
863                    self.cache.handle_event(&event);
864                    count += 1;
865                }
866            }
867        }
868        count
869    }
870
871    /// Parse a log into a cache invalidation event
872    pub fn parse_log(&self, log: &alloy_rpc_types::Log) -> Option<CacheInvalidationEvent> {
873        use blueprint_client_tangle::contracts::ITangle;
874
875        // Try to decode each event type
876        if let Ok(event) = log.log_decode::<ITangle::OperatorJoinedService>() {
877            return Some(CacheInvalidationEvent::OperatorJoined {
878                service_id: event.inner.serviceId,
879                operator: event.inner.operator,
880            });
881        }
882
883        if let Ok(event) = log.log_decode::<ITangle::OperatorLeftService>() {
884            return Some(CacheInvalidationEvent::OperatorLeft {
885                service_id: event.inner.serviceId,
886                operator: event.inner.operator,
887            });
888        }
889
890        if let Ok(event) = log.log_decode::<ITangle::ServiceTerminated>() {
891            return Some(CacheInvalidationEvent::ServiceTerminated {
892                service_id: event.inner.serviceId,
893            });
894        }
895
896        if let Ok(event) = log.log_decode::<ITangle::ServiceActivated>() {
897            return Some(CacheInvalidationEvent::ServiceActivated {
898                service_id: event.inner.serviceId,
899            });
900        }
901
902        None
903    }
904
905    /// Process a single event manually (useful for testing or custom event sources)
906    pub fn process_event(&self, event: CacheInvalidationEvent) {
907        self.cache.handle_event(&event);
908    }
909
910    /// Get the last processed block number
911    pub fn last_block(&self) -> u64 {
912        self.last_block.load(Ordering::Relaxed)
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use super::*;
919
920    #[test]
921    fn test_cache_entry_expiration() {
922        let entry = CacheEntry::new(42);
923
924        // Should not be expired immediately
925        assert!(!entry.is_expired(Duration::from_secs(1)));
926
927        // Should be expired with zero TTL
928        assert!(entry.is_expired(Duration::ZERO));
929    }
930
931    #[test]
932    fn test_operator_weights_threshold_calculation() {
933        let mut weights = HashMap::new();
934        // 3 operators: 5000, 3000, 2000 bps (total = 10000)
935        weights.insert(Address::ZERO, 5000);
936        weights.insert(Address::repeat_byte(1), 3000);
937        weights.insert(Address::repeat_byte(2), 2000);
938
939        let op_weights = OperatorWeights {
940            weights,
941            total_exposure: 10000,
942        };
943
944        // 50% threshold = 5000 bps needed
945        // Sorted: [5000, 3000, 2000]
946        // Just operator 0 (5000) meets 50%
947        assert_eq!(op_weights.calculate_threshold_signers(5000), 1);
948
949        // 67% threshold = 6700 bps needed
950        // Need operator 0 (5000) + operator 1 (3000) = 8000
951        assert_eq!(op_weights.calculate_threshold_signers(6700), 2);
952
953        // 100% threshold = 10000 bps needed
954        // Need all 3 operators
955        assert_eq!(op_weights.calculate_threshold_signers(10000), 3);
956    }
957
958    #[test]
959    fn test_service_operators_index() {
960        let ops = vec![
961            Address::repeat_byte(1),
962            Address::repeat_byte(2),
963            Address::repeat_byte(3),
964        ];
965        let service_ops = ServiceOperators::new(ops);
966
967        assert_eq!(service_ops.index_of(&Address::repeat_byte(1)), Some(0));
968        assert_eq!(service_ops.index_of(&Address::repeat_byte(2)), Some(1));
969        assert_eq!(service_ops.index_of(&Address::repeat_byte(3)), Some(2));
970        assert_eq!(service_ops.index_of(&Address::repeat_byte(4)), None);
971    }
972
973    #[test]
974    fn test_cache_stats() {
975        let cache = ServiceConfigCache::with_default_ttl();
976        let stats = cache.stats();
977
978        assert_eq!(stats.aggregation_configs, 0);
979        assert_eq!(stats.operator_weights, 0);
980        assert_eq!(stats.service_operators, 0);
981        assert_eq!(stats.ttl, DEFAULT_CACHE_TTL);
982    }
983
984    #[test]
985    fn test_cache_invalidation_event_display() {
986        let event = CacheInvalidationEvent::OperatorJoined {
987            service_id: 1,
988            operator: Address::repeat_byte(0xAB),
989        };
990        assert!(event.to_string().contains("OperatorJoined"));
991        assert!(event.to_string().contains("service=1"));
992
993        let event = CacheInvalidationEvent::OperatorLeft {
994            service_id: 2,
995            operator: Address::repeat_byte(0xCD),
996        };
997        assert!(event.to_string().contains("OperatorLeft"));
998
999        let event = CacheInvalidationEvent::ServiceTerminated { service_id: 3 };
1000        assert!(event.to_string().contains("ServiceTerminated"));
1001
1002        let event = CacheInvalidationEvent::ServiceActivated { service_id: 4 };
1003        assert!(event.to_string().contains("ServiceActivated"));
1004    }
1005
1006    #[test]
1007    fn test_handle_operator_joined_invalidates_cache() {
1008        let cache = ServiceConfigCache::with_default_ttl();
1009
1010        // Pre-populate cache
1011        let mut weights = HashMap::new();
1012        weights.insert(Address::ZERO, 5000u16);
1013        cache
1014            .set_operator_weights(
1015                1,
1016                OperatorWeights {
1017                    weights,
1018                    total_exposure: 5000,
1019                },
1020            )
1021            .unwrap();
1022
1023        // Verify cache is populated
1024        assert_eq!(cache.stats().operator_weights, 1);
1025
1026        // Handle operator joined event
1027        cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
1028            service_id: 1,
1029            operator: Address::repeat_byte(1),
1030        });
1031
1032        // Cache should be invalidated
1033        assert_eq!(cache.stats().operator_weights, 0);
1034    }
1035
1036    #[test]
1037    fn test_handle_operator_left_invalidates_cache() {
1038        let cache = ServiceConfigCache::with_default_ttl();
1039
1040        // Pre-populate cache
1041        let mut weights = HashMap::new();
1042        weights.insert(Address::ZERO, 5000u16);
1043        cache
1044            .set_operator_weights(
1045                1,
1046                OperatorWeights {
1047                    weights,
1048                    total_exposure: 5000,
1049                },
1050            )
1051            .unwrap();
1052
1053        assert_eq!(cache.stats().operator_weights, 1);
1054
1055        // Handle operator left event
1056        cache.handle_event(&CacheInvalidationEvent::OperatorLeft {
1057            service_id: 1,
1058            operator: Address::ZERO,
1059        });
1060
1061        // Cache should be invalidated
1062        assert_eq!(cache.stats().operator_weights, 0);
1063    }
1064
1065    #[test]
1066    fn test_handle_service_terminated_clears_all() {
1067        let cache = ServiceConfigCache::with_default_ttl();
1068
1069        // Pre-populate cache for service 1
1070        let mut weights = HashMap::new();
1071        weights.insert(Address::ZERO, 5000u16);
1072        cache
1073            .set_operator_weights(
1074                1,
1075                OperatorWeights {
1076                    weights: weights.clone(),
1077                    total_exposure: 5000,
1078                },
1079            )
1080            .unwrap();
1081
1082        // Also populate service 2
1083        cache
1084            .set_operator_weights(
1085                2,
1086                OperatorWeights {
1087                    weights,
1088                    total_exposure: 5000,
1089                },
1090            )
1091            .unwrap();
1092
1093        assert_eq!(cache.stats().operator_weights, 2);
1094
1095        // Terminate service 1
1096        cache.handle_event(&CacheInvalidationEvent::ServiceTerminated { service_id: 1 });
1097
1098        // Only service 1 should be cleared
1099        assert_eq!(cache.stats().operator_weights, 1);
1100    }
1101
1102    #[test]
1103    fn test_handle_service_activated_no_invalidation() {
1104        let cache = ServiceConfigCache::with_default_ttl();
1105
1106        // Pre-populate cache
1107        let mut weights = HashMap::new();
1108        weights.insert(Address::ZERO, 5000u16);
1109        cache
1110            .set_operator_weights(
1111                1,
1112                OperatorWeights {
1113                    weights,
1114                    total_exposure: 5000,
1115                },
1116            )
1117            .unwrap();
1118
1119        assert_eq!(cache.stats().operator_weights, 1);
1120
1121        // Service activated should NOT invalidate existing cache
1122        cache.handle_event(&CacheInvalidationEvent::ServiceActivated { service_id: 1 });
1123
1124        // Cache should still be there
1125        assert_eq!(cache.stats().operator_weights, 1);
1126    }
1127
1128    #[test]
1129    fn test_invalidation_only_affects_target_service() {
1130        let cache = ServiceConfigCache::with_default_ttl();
1131
1132        // Populate cache for services 1, 2, 3
1133        for service_id in 1..=3 {
1134            let mut weights = HashMap::new();
1135            weights.insert(Address::repeat_byte(service_id as u8), 5000u16);
1136            cache
1137                .set_operator_weights(
1138                    service_id,
1139                    OperatorWeights {
1140                        weights,
1141                        total_exposure: 5000,
1142                    },
1143                )
1144                .unwrap();
1145        }
1146
1147        assert_eq!(cache.stats().operator_weights, 3);
1148
1149        // Invalidate only service 2
1150        cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
1151            service_id: 2,
1152            operator: Address::repeat_byte(0xFF),
1153        });
1154
1155        // Should have 2 services left
1156        assert_eq!(cache.stats().operator_weights, 2);
1157    }
1158}