1use alloy_primitives::Address;
26use blueprint_client_tangle::{AggregationConfig, OperatorMetadata, TangleClient};
27use blueprint_std::collections::HashMap;
28use blueprint_std::format;
29use blueprint_std::string::{String, ToString};
30use blueprint_std::sync::{Arc, RwLock};
31use blueprint_std::time::{Duration, Instant};
32use blueprint_std::vec::Vec;
33use core::fmt;
34use core::sync::atomic::{AtomicU64, Ordering};
35
36pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(300);
38
39#[derive(Debug, thiserror::Error)]
41pub enum CacheError {
42 #[error("Failed to fetch from chain: {0}")]
44 FetchError(String),
45 #[error("Cache lock poisoned")]
47 LockPoisoned,
48}
49
50#[derive(Clone, Debug)]
52struct CacheEntry<T> {
53 value: T,
54 cached_at: Instant,
55}
56
57impl<T> CacheEntry<T> {
58 fn new(value: T) -> Self {
59 Self {
60 value,
61 cached_at: Instant::now(),
62 }
63 }
64
65 fn is_expired(&self, ttl: Duration) -> bool {
66 self.cached_at.elapsed() > ttl
67 }
68}
69
70#[derive(Clone, Debug)]
72pub struct OperatorWeights {
73 pub weights: HashMap<Address, u16>,
75 pub total_exposure: u64,
77}
78
79impl OperatorWeights {
80 pub fn get(&self, operator: &Address) -> Option<u16> {
82 self.weights.get(operator).copied()
83 }
84
85 pub fn contains(&self, operator: &Address) -> bool {
87 self.weights.contains_key(operator)
88 }
89
90 pub fn len(&self) -> usize {
92 self.weights.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.weights.is_empty()
98 }
99
100 pub fn iter(&self) -> impl Iterator<Item = (&Address, &u16)> {
102 self.weights.iter()
103 }
104
105 pub fn calculate_threshold_signers(&self, threshold_bps: u16) -> usize {
110 if self.weights.is_empty() {
111 return 0;
112 }
113
114 let required_weight = (self.total_exposure * threshold_bps as u64) / 10000;
115
116 let mut sorted: Vec<_> = self.weights.iter().collect();
118 sorted.sort_by(|a, b| b.1.cmp(a.1));
119
120 let mut accumulated: u64 = 0;
121 let mut count = 0;
122
123 for (_, &weight) in sorted {
124 accumulated += weight as u64;
125 count += 1;
126 if accumulated >= required_weight {
127 break;
128 }
129 }
130
131 count
132 }
133}
134
135#[derive(Clone, Debug)]
137pub struct ServiceOperators {
138 pub operators: Vec<Address>,
140 pub index_map: HashMap<Address, usize>,
142}
143
144impl ServiceOperators {
145 pub fn new(operators: Vec<Address>) -> Self {
147 let index_map = operators
148 .iter()
149 .enumerate()
150 .map(|(i, addr)| (*addr, i))
151 .collect();
152 Self {
153 operators,
154 index_map,
155 }
156 }
157
158 pub fn index_of(&self, operator: &Address) -> Option<usize> {
160 self.index_map.get(operator).copied()
161 }
162
163 pub fn len(&self) -> usize {
165 self.operators.len()
166 }
167
168 pub fn is_empty(&self) -> bool {
170 self.operators.is_empty()
171 }
172
173 pub fn iter(&self) -> impl Iterator<Item = &Address> {
175 self.operators.iter()
176 }
177}
178
179pub struct ServiceConfigCache {
184 ttl: Duration,
186 aggregation_configs: RwLock<HashMap<(u64, u8), CacheEntry<AggregationConfig>>>,
188 operator_weights: RwLock<HashMap<u64, CacheEntry<OperatorWeights>>>,
190 service_operators: RwLock<HashMap<u64, CacheEntry<ServiceOperators>>>,
192 operator_metadata: RwLock<HashMap<(u64, Address), CacheEntry<OperatorMetadata>>>,
194}
195
196impl ServiceConfigCache {
197 pub fn new(ttl: Duration) -> Self {
199 Self {
200 ttl,
201 aggregation_configs: RwLock::new(HashMap::new()),
202 operator_weights: RwLock::new(HashMap::new()),
203 service_operators: RwLock::new(HashMap::new()),
204 operator_metadata: RwLock::new(HashMap::new()),
205 }
206 }
207
208 pub fn with_default_ttl() -> Self {
210 Self::new(DEFAULT_CACHE_TTL)
211 }
212
213 pub fn ttl(&self) -> Duration {
215 self.ttl
216 }
217
218 pub fn set_ttl(&mut self, ttl: Duration) {
220 self.ttl = ttl;
221 }
222
223 pub async fn get_aggregation_config(
229 &self,
230 client: &TangleClient,
231 service_id: u64,
232 job_index: u8,
233 ) -> Result<AggregationConfig, CacheError> {
234 let key = (service_id, job_index);
235
236 {
238 let cache = self
239 .aggregation_configs
240 .read()
241 .map_err(|_| CacheError::LockPoisoned)?;
242 if let Some(entry) = cache.get(&key) {
243 if !entry.is_expired(self.ttl) {
244 blueprint_core::trace!(
245 target: "service-config-cache",
246 "Cache hit for aggregation config: service={}, job={}",
247 service_id,
248 job_index
249 );
250 return Ok(entry.value.clone());
251 }
252 }
253 }
254
255 blueprint_core::debug!(
257 target: "service-config-cache",
258 "Cache miss for aggregation config: service={}, job={}, fetching from chain",
259 service_id,
260 job_index
261 );
262
263 let config = client
264 .get_aggregation_config(service_id, job_index)
265 .await
266 .map_err(|e| CacheError::FetchError(e.to_string()))?;
267
268 {
270 let mut cache = self
271 .aggregation_configs
272 .write()
273 .map_err(|_| CacheError::LockPoisoned)?;
274 cache.insert(key, CacheEntry::new(config.clone()));
275 }
276
277 Ok(config)
278 }
279
280 pub fn set_aggregation_config(
282 &self,
283 service_id: u64,
284 job_index: u8,
285 config: AggregationConfig,
286 ) -> Result<(), CacheError> {
287 let mut cache = self
288 .aggregation_configs
289 .write()
290 .map_err(|_| CacheError::LockPoisoned)?;
291 cache.insert((service_id, job_index), CacheEntry::new(config));
292 Ok(())
293 }
294
295 pub async fn get_operator_weights(
301 &self,
302 client: &TangleClient,
303 service_id: u64,
304 ) -> Result<OperatorWeights, CacheError> {
305 {
307 let cache = self
308 .operator_weights
309 .read()
310 .map_err(|_| CacheError::LockPoisoned)?;
311 if let Some(entry) = cache.get(&service_id) {
312 if !entry.is_expired(self.ttl) {
313 blueprint_core::trace!(
314 target: "service-config-cache",
315 "Cache hit for operator weights: service={}",
316 service_id
317 );
318 return Ok(entry.value.clone());
319 }
320 }
321 }
322
323 blueprint_core::debug!(
325 target: "service-config-cache",
326 "Cache miss for operator weights: service={}, fetching from chain",
327 service_id
328 );
329
330 let weights = self.fetch_operator_weights(client, service_id).await?;
331
332 {
334 let mut cache = self
335 .operator_weights
336 .write()
337 .map_err(|_| CacheError::LockPoisoned)?;
338 cache.insert(service_id, CacheEntry::new(weights.clone()));
339 }
340
341 Ok(weights)
342 }
343
344 async fn fetch_operator_weights(
346 &self,
347 client: &TangleClient,
348 service_id: u64,
349 ) -> Result<OperatorWeights, CacheError> {
350 let operators = client
352 .get_service_operators(service_id)
353 .await
354 .map_err(|e| CacheError::FetchError(format!("Failed to get operators: {}", e)))?;
355
356 let mut weights = HashMap::new();
358 let mut total_exposure: u64 = 0;
359
360 for operator in operators {
361 match client.get_service_operator(service_id, operator).await {
362 Ok(op_info) => {
363 if op_info.active {
364 weights.insert(operator, op_info.exposureBps);
365 total_exposure += op_info.exposureBps as u64;
366 }
367 }
368 Err(e) => {
369 blueprint_core::warn!(
370 target: "service-config-cache",
371 "Failed to get operator info for {}: {}",
372 operator,
373 e
374 );
375 }
376 }
377 }
378
379 Ok(OperatorWeights {
380 weights,
381 total_exposure,
382 })
383 }
384
385 pub fn set_operator_weights(
387 &self,
388 service_id: u64,
389 weights: OperatorWeights,
390 ) -> Result<(), CacheError> {
391 let mut cache = self
392 .operator_weights
393 .write()
394 .map_err(|_| CacheError::LockPoisoned)?;
395 cache.insert(service_id, CacheEntry::new(weights));
396 Ok(())
397 }
398
399 pub async fn get_service_operators(
405 &self,
406 client: &TangleClient,
407 service_id: u64,
408 ) -> Result<ServiceOperators, CacheError> {
409 {
411 let cache = self
412 .service_operators
413 .read()
414 .map_err(|_| CacheError::LockPoisoned)?;
415 if let Some(entry) = cache.get(&service_id) {
416 if !entry.is_expired(self.ttl) {
417 blueprint_core::trace!(
418 target: "service-config-cache",
419 "Cache hit for service operators: service={}",
420 service_id
421 );
422 return Ok(entry.value.clone());
423 }
424 }
425 }
426
427 blueprint_core::debug!(
429 target: "service-config-cache",
430 "Cache miss for service operators: service={}, fetching from chain",
431 service_id
432 );
433
434 let operators_list = client
435 .get_service_operators(service_id)
436 .await
437 .map_err(|e| CacheError::FetchError(e.to_string()))?;
438
439 let operators = ServiceOperators::new(operators_list);
440
441 {
443 let mut cache = self
444 .service_operators
445 .write()
446 .map_err(|_| CacheError::LockPoisoned)?;
447 cache.insert(service_id, CacheEntry::new(operators.clone()));
448 }
449
450 Ok(operators)
451 }
452
453 pub async fn get_operator_metadata(
455 &self,
456 client: &TangleClient,
457 blueprint_id: u64,
458 operator: Address,
459 ) -> Result<OperatorMetadata, CacheError> {
460 let key = (blueprint_id, operator);
461 if let Some(entry) = self
462 .operator_metadata
463 .read()
464 .map_err(|_| CacheError::LockPoisoned)?
465 .get(&key)
466 .cloned()
467 {
468 if !entry.is_expired(self.ttl) {
469 return Ok(entry.value);
470 }
471 }
472
473 let metadata = client
474 .get_operator_metadata(blueprint_id, operator)
475 .await
476 .map_err(|e| CacheError::FetchError(e.to_string()))?;
477 let mut guard = self
478 .operator_metadata
479 .write()
480 .map_err(|_| CacheError::LockPoisoned)?;
481 guard.insert(key, CacheEntry::new(metadata.clone()));
482 Ok(metadata)
483 }
484
485 pub async fn get_service_operator_metadata(
487 &self,
488 client: &TangleClient,
489 blueprint_id: u64,
490 service_id: u64,
491 ) -> Result<HashMap<Address, OperatorMetadata>, CacheError> {
492 let operators = self.get_service_operators(client, service_id).await?;
493 let mut result = HashMap::with_capacity(operators.len());
494 for operator in operators.iter() {
495 let metadata = self
496 .get_operator_metadata(client, blueprint_id, *operator)
497 .await?;
498 result.insert(*operator, metadata);
499 }
500 Ok(result)
501 }
502
503 pub fn invalidate_service(&self, service_id: u64) {
509 blueprint_core::debug!(
510 target: "service-config-cache",
511 "Invalidating cache for service {}",
512 service_id
513 );
514
515 if let Ok(mut cache) = self.aggregation_configs.write() {
517 cache.retain(|(sid, _), _| *sid != service_id);
518 }
519
520 if let Ok(mut cache) = self.operator_weights.write() {
522 cache.remove(&service_id);
523 }
524
525 if let Ok(mut cache) = self.service_operators.write() {
527 cache.remove(&service_id);
528 }
529 }
530
531 pub fn invalidate_aggregation_config(&self, service_id: u64, job_index: u8) {
533 if let Ok(mut cache) = self.aggregation_configs.write() {
534 cache.remove(&(service_id, job_index));
535 }
536 }
537
538 pub fn clear(&self) {
540 blueprint_core::debug!(
541 target: "service-config-cache",
542 "Clearing all cached service configs"
543 );
544
545 if let Ok(mut cache) = self.aggregation_configs.write() {
546 cache.clear();
547 }
548 if let Ok(mut cache) = self.operator_weights.write() {
549 cache.clear();
550 }
551 if let Ok(mut cache) = self.service_operators.write() {
552 cache.clear();
553 }
554 }
555
556 pub fn cleanup_expired(&self) {
558 let ttl = self.ttl;
559
560 if let Ok(mut cache) = self.aggregation_configs.write() {
561 cache.retain(|_, entry| !entry.is_expired(ttl));
562 }
563 if let Ok(mut cache) = self.operator_weights.write() {
564 cache.retain(|_, entry| !entry.is_expired(ttl));
565 }
566 if let Ok(mut cache) = self.service_operators.write() {
567 cache.retain(|_, entry| !entry.is_expired(ttl));
568 }
569 }
570
571 pub fn stats(&self) -> CacheStats {
573 let aggregation_count = self
574 .aggregation_configs
575 .read()
576 .map(|c| c.len())
577 .unwrap_or(0);
578 let weights_count = self.operator_weights.read().map(|c| c.len()).unwrap_or(0);
579 let operators_count = self.service_operators.read().map(|c| c.len()).unwrap_or(0);
580
581 CacheStats {
582 aggregation_configs: aggregation_count,
583 operator_weights: weights_count,
584 service_operators: operators_count,
585 ttl: self.ttl,
586 }
587 }
588}
589
590impl Default for ServiceConfigCache {
591 fn default() -> Self {
592 Self::with_default_ttl()
593 }
594}
595
596#[derive(Clone, Debug)]
598pub struct CacheStats {
599 pub aggregation_configs: usize,
601 pub operator_weights: usize,
603 pub service_operators: usize,
605 pub ttl: Duration,
607}
608
609impl fmt::Display for CacheStats {
610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611 write!(
612 f,
613 "ServiceConfigCache {{ aggregation_configs: {}, operator_weights: {}, service_operators: {}, ttl: {:?} }}",
614 self.aggregation_configs, self.operator_weights, self.service_operators, self.ttl
615 )
616 }
617}
618
619pub type SharedServiceConfigCache = Arc<ServiceConfigCache>;
621
622pub fn shared_cache() -> SharedServiceConfigCache {
624 Arc::new(ServiceConfigCache::with_default_ttl())
625}
626
627pub fn shared_cache_with_ttl(ttl: Duration) -> SharedServiceConfigCache {
629 Arc::new(ServiceConfigCache::new(ttl))
630}
631
632#[derive(Debug, Clone)]
638pub enum CacheInvalidationEvent {
639 OperatorJoined { service_id: u64, operator: Address },
641 OperatorLeft { service_id: u64, operator: Address },
643 ServiceTerminated { service_id: u64 },
645 ServiceActivated { service_id: u64 },
647}
648
649impl fmt::Display for CacheInvalidationEvent {
650 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651 match self {
652 Self::OperatorJoined {
653 service_id,
654 operator,
655 } => {
656 write!(
657 f,
658 "OperatorJoined(service={}, operator={})",
659 service_id, operator
660 )
661 }
662 Self::OperatorLeft {
663 service_id,
664 operator,
665 } => {
666 write!(
667 f,
668 "OperatorLeft(service={}, operator={})",
669 service_id, operator
670 )
671 }
672 Self::ServiceTerminated { service_id } => {
673 write!(f, "ServiceTerminated(service={})", service_id)
674 }
675 Self::ServiceActivated { service_id } => {
676 write!(f, "ServiceActivated(service={})", service_id)
677 }
678 }
679 }
680}
681
682impl ServiceConfigCache {
683 pub fn handle_event(&self, event: &CacheInvalidationEvent) {
688 blueprint_core::info!(
689 target: "service-config-cache",
690 "⚡ Cache invalidation triggered by event: {}",
691 event
692 );
693
694 match event {
695 CacheInvalidationEvent::OperatorJoined {
696 service_id,
697 operator,
698 } => {
699 blueprint_core::info!(
700 target: "service-config-cache",
701 "🔄 Invalidating cache: operator {} joined service {}",
702 operator,
703 service_id
704 );
705 self.invalidate_operator_data(*service_id);
706 }
707 CacheInvalidationEvent::OperatorLeft {
708 service_id,
709 operator,
710 } => {
711 blueprint_core::info!(
712 target: "service-config-cache",
713 "🔄 Invalidating cache: operator {} left service {}",
714 operator,
715 service_id
716 );
717 self.invalidate_operator_data(*service_id);
718 }
719 CacheInvalidationEvent::ServiceTerminated { service_id } => {
720 blueprint_core::info!(
721 target: "service-config-cache",
722 "🗑️ Clearing all cache for terminated service {}",
723 service_id
724 );
725 self.invalidate_service(*service_id);
726 }
727 CacheInvalidationEvent::ServiceActivated { service_id } => {
728 blueprint_core::info!(
729 target: "service-config-cache",
730 "✨ Service {} activated (cache will be populated on first access)",
731 service_id
732 );
733 }
735 }
736 }
737
738 fn invalidate_operator_data(&self, service_id: u64) {
740 if let Ok(mut cache) = self.operator_weights.write() {
741 cache.remove(&service_id);
742 }
743 if let Ok(mut cache) = self.service_operators.write() {
744 cache.remove(&service_id);
745 }
746 }
747}
748
749pub struct CacheSyncService {
771 client: Arc<TangleClient>,
772 cache: SharedServiceConfigCache,
773 watched_services: Option<Vec<u64>>,
775 last_block: AtomicU64,
777}
778
779impl CacheSyncService {
780 pub fn new(client: Arc<TangleClient>, cache: SharedServiceConfigCache) -> Self {
782 Self {
783 client,
784 cache,
785 watched_services: None,
786 last_block: AtomicU64::new(0),
787 }
788 }
789
790 pub fn with_services(mut self, services: Vec<u64>) -> Self {
792 self.watched_services = Some(services);
793 self
794 }
795
796 pub fn from_block(self, block: u64) -> Self {
798 self.last_block.store(block, Ordering::Relaxed);
799 self
800 }
801
802 fn should_watch(&self, service_id: u64) -> bool {
804 self.watched_services
805 .as_ref()
806 .map(|s| s.contains(&service_id))
807 .unwrap_or(true)
808 }
809
810 pub async fn poll_and_sync(&self) -> Result<usize, CacheError> {
814 use alloy_rpc_types::Filter;
815 use blueprint_client_tangle::contracts::ITangle;
816
817 let from_block = self.last_block.load(Ordering::Relaxed);
818 let tangle_address = self.client.config.settings.tangle_contract;
819
820 let filter = Filter::new()
822 .address(tangle_address)
823 .from_block(from_block)
824 .events([
825 <ITangle::OperatorJoinedService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
826 <ITangle::OperatorLeftService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
827 <ITangle::ServiceTerminated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
828 <ITangle::ServiceActivated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
829 ]);
830
831 let logs = self
832 .client
833 .get_logs(&filter)
834 .await
835 .map_err(|e| CacheError::FetchError(format!("Failed to fetch logs: {}", e)))?;
836
837 let count = self.process_logs(&logs);
838
839 if let Some(last_log) = logs.last() {
841 if let Some(block_num) = last_log.block_number {
842 self.last_block.store(block_num + 1, Ordering::Relaxed);
843 }
844 }
845
846 Ok(count)
847 }
848
849 pub fn process_logs(&self, logs: &[alloy_rpc_types::Log]) -> usize {
853 let mut count = 0;
854 for log in logs {
855 if let Some(event) = self.parse_log(log) {
856 let service_id = match &event {
857 CacheInvalidationEvent::OperatorJoined { service_id, .. } => *service_id,
858 CacheInvalidationEvent::OperatorLeft { service_id, .. } => *service_id,
859 CacheInvalidationEvent::ServiceTerminated { service_id } => *service_id,
860 CacheInvalidationEvent::ServiceActivated { service_id } => *service_id,
861 };
862 if self.should_watch(service_id) {
863 self.cache.handle_event(&event);
864 count += 1;
865 }
866 }
867 }
868 count
869 }
870
871 pub fn parse_log(&self, log: &alloy_rpc_types::Log) -> Option<CacheInvalidationEvent> {
873 use blueprint_client_tangle::contracts::ITangle;
874
875 if let Ok(event) = log.log_decode::<ITangle::OperatorJoinedService>() {
877 return Some(CacheInvalidationEvent::OperatorJoined {
878 service_id: event.inner.serviceId,
879 operator: event.inner.operator,
880 });
881 }
882
883 if let Ok(event) = log.log_decode::<ITangle::OperatorLeftService>() {
884 return Some(CacheInvalidationEvent::OperatorLeft {
885 service_id: event.inner.serviceId,
886 operator: event.inner.operator,
887 });
888 }
889
890 if let Ok(event) = log.log_decode::<ITangle::ServiceTerminated>() {
891 return Some(CacheInvalidationEvent::ServiceTerminated {
892 service_id: event.inner.serviceId,
893 });
894 }
895
896 if let Ok(event) = log.log_decode::<ITangle::ServiceActivated>() {
897 return Some(CacheInvalidationEvent::ServiceActivated {
898 service_id: event.inner.serviceId,
899 });
900 }
901
902 None
903 }
904
905 pub fn process_event(&self, event: CacheInvalidationEvent) {
907 self.cache.handle_event(&event);
908 }
909
910 pub fn last_block(&self) -> u64 {
912 self.last_block.load(Ordering::Relaxed)
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::*;
919
920 #[test]
921 fn test_cache_entry_expiration() {
922 let entry = CacheEntry::new(42);
923
924 assert!(!entry.is_expired(Duration::from_secs(1)));
926
927 assert!(entry.is_expired(Duration::ZERO));
929 }
930
931 #[test]
932 fn test_operator_weights_threshold_calculation() {
933 let mut weights = HashMap::new();
934 weights.insert(Address::ZERO, 5000);
936 weights.insert(Address::repeat_byte(1), 3000);
937 weights.insert(Address::repeat_byte(2), 2000);
938
939 let op_weights = OperatorWeights {
940 weights,
941 total_exposure: 10000,
942 };
943
944 assert_eq!(op_weights.calculate_threshold_signers(5000), 1);
948
949 assert_eq!(op_weights.calculate_threshold_signers(6700), 2);
952
953 assert_eq!(op_weights.calculate_threshold_signers(10000), 3);
956 }
957
958 #[test]
959 fn test_service_operators_index() {
960 let ops = vec![
961 Address::repeat_byte(1),
962 Address::repeat_byte(2),
963 Address::repeat_byte(3),
964 ];
965 let service_ops = ServiceOperators::new(ops);
966
967 assert_eq!(service_ops.index_of(&Address::repeat_byte(1)), Some(0));
968 assert_eq!(service_ops.index_of(&Address::repeat_byte(2)), Some(1));
969 assert_eq!(service_ops.index_of(&Address::repeat_byte(3)), Some(2));
970 assert_eq!(service_ops.index_of(&Address::repeat_byte(4)), None);
971 }
972
973 #[test]
974 fn test_cache_stats() {
975 let cache = ServiceConfigCache::with_default_ttl();
976 let stats = cache.stats();
977
978 assert_eq!(stats.aggregation_configs, 0);
979 assert_eq!(stats.operator_weights, 0);
980 assert_eq!(stats.service_operators, 0);
981 assert_eq!(stats.ttl, DEFAULT_CACHE_TTL);
982 }
983
984 #[test]
985 fn test_cache_invalidation_event_display() {
986 let event = CacheInvalidationEvent::OperatorJoined {
987 service_id: 1,
988 operator: Address::repeat_byte(0xAB),
989 };
990 assert!(event.to_string().contains("OperatorJoined"));
991 assert!(event.to_string().contains("service=1"));
992
993 let event = CacheInvalidationEvent::OperatorLeft {
994 service_id: 2,
995 operator: Address::repeat_byte(0xCD),
996 };
997 assert!(event.to_string().contains("OperatorLeft"));
998
999 let event = CacheInvalidationEvent::ServiceTerminated { service_id: 3 };
1000 assert!(event.to_string().contains("ServiceTerminated"));
1001
1002 let event = CacheInvalidationEvent::ServiceActivated { service_id: 4 };
1003 assert!(event.to_string().contains("ServiceActivated"));
1004 }
1005
1006 #[test]
1007 fn test_handle_operator_joined_invalidates_cache() {
1008 let cache = ServiceConfigCache::with_default_ttl();
1009
1010 let mut weights = HashMap::new();
1012 weights.insert(Address::ZERO, 5000u16);
1013 cache
1014 .set_operator_weights(
1015 1,
1016 OperatorWeights {
1017 weights,
1018 total_exposure: 5000,
1019 },
1020 )
1021 .unwrap();
1022
1023 assert_eq!(cache.stats().operator_weights, 1);
1025
1026 cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
1028 service_id: 1,
1029 operator: Address::repeat_byte(1),
1030 });
1031
1032 assert_eq!(cache.stats().operator_weights, 0);
1034 }
1035
1036 #[test]
1037 fn test_handle_operator_left_invalidates_cache() {
1038 let cache = ServiceConfigCache::with_default_ttl();
1039
1040 let mut weights = HashMap::new();
1042 weights.insert(Address::ZERO, 5000u16);
1043 cache
1044 .set_operator_weights(
1045 1,
1046 OperatorWeights {
1047 weights,
1048 total_exposure: 5000,
1049 },
1050 )
1051 .unwrap();
1052
1053 assert_eq!(cache.stats().operator_weights, 1);
1054
1055 cache.handle_event(&CacheInvalidationEvent::OperatorLeft {
1057 service_id: 1,
1058 operator: Address::ZERO,
1059 });
1060
1061 assert_eq!(cache.stats().operator_weights, 0);
1063 }
1064
1065 #[test]
1066 fn test_handle_service_terminated_clears_all() {
1067 let cache = ServiceConfigCache::with_default_ttl();
1068
1069 let mut weights = HashMap::new();
1071 weights.insert(Address::ZERO, 5000u16);
1072 cache
1073 .set_operator_weights(
1074 1,
1075 OperatorWeights {
1076 weights: weights.clone(),
1077 total_exposure: 5000,
1078 },
1079 )
1080 .unwrap();
1081
1082 cache
1084 .set_operator_weights(
1085 2,
1086 OperatorWeights {
1087 weights,
1088 total_exposure: 5000,
1089 },
1090 )
1091 .unwrap();
1092
1093 assert_eq!(cache.stats().operator_weights, 2);
1094
1095 cache.handle_event(&CacheInvalidationEvent::ServiceTerminated { service_id: 1 });
1097
1098 assert_eq!(cache.stats().operator_weights, 1);
1100 }
1101
1102 #[test]
1103 fn test_handle_service_activated_no_invalidation() {
1104 let cache = ServiceConfigCache::with_default_ttl();
1105
1106 let mut weights = HashMap::new();
1108 weights.insert(Address::ZERO, 5000u16);
1109 cache
1110 .set_operator_weights(
1111 1,
1112 OperatorWeights {
1113 weights,
1114 total_exposure: 5000,
1115 },
1116 )
1117 .unwrap();
1118
1119 assert_eq!(cache.stats().operator_weights, 1);
1120
1121 cache.handle_event(&CacheInvalidationEvent::ServiceActivated { service_id: 1 });
1123
1124 assert_eq!(cache.stats().operator_weights, 1);
1126 }
1127
1128 #[test]
1129 fn test_invalidation_only_affects_target_service() {
1130 let cache = ServiceConfigCache::with_default_ttl();
1131
1132 for service_id in 1..=3 {
1134 let mut weights = HashMap::new();
1135 weights.insert(Address::repeat_byte(service_id as u8), 5000u16);
1136 cache
1137 .set_operator_weights(
1138 service_id,
1139 OperatorWeights {
1140 weights,
1141 total_exposure: 5000,
1142 },
1143 )
1144 .unwrap();
1145 }
1146
1147 assert_eq!(cache.stats().operator_weights, 3);
1148
1149 cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
1151 service_id: 2,
1152 operator: Address::repeat_byte(0xFF),
1153 });
1154
1155 assert_eq!(cache.stats().operator_weights, 2);
1157 }
1158}