use alloy_primitives::Address;
use blueprint_client_tangle::{AggregationConfig, OperatorMetadata, TangleClient};
use blueprint_std::collections::HashMap;
use blueprint_std::format;
use blueprint_std::string::{String, ToString};
use blueprint_std::sync::{Arc, RwLock};
use blueprint_std::time::{Duration, Instant};
use blueprint_std::vec::Vec;
use core::fmt;
use core::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(300);
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("Failed to fetch from chain: {0}")]
FetchError(String),
#[error("Cache lock poisoned")]
LockPoisoned,
}
#[derive(Clone, Debug)]
struct CacheEntry<T> {
value: T,
cached_at: Instant,
}
impl<T> CacheEntry<T> {
fn new(value: T) -> Self {
Self {
value,
cached_at: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.cached_at.elapsed() > ttl
}
}
#[derive(Clone, Debug)]
pub struct OperatorWeights {
pub weights: HashMap<Address, u16>,
pub total_exposure: u64,
}
impl OperatorWeights {
pub fn get(&self, operator: &Address) -> Option<u16> {
self.weights.get(operator).copied()
}
pub fn contains(&self, operator: &Address) -> bool {
self.weights.contains_key(operator)
}
pub fn len(&self) -> usize {
self.weights.len()
}
pub fn is_empty(&self) -> bool {
self.weights.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (&Address, &u16)> {
self.weights.iter()
}
pub fn calculate_threshold_signers(&self, threshold_bps: u16) -> usize {
if self.weights.is_empty() {
return 0;
}
let required_weight = (self.total_exposure * threshold_bps as u64) / 10000;
let mut sorted: Vec<_> = self.weights.iter().collect();
sorted.sort_by(|a, b| b.1.cmp(a.1));
let mut accumulated: u64 = 0;
let mut count = 0;
for (_, &weight) in sorted {
accumulated += weight as u64;
count += 1;
if accumulated >= required_weight {
break;
}
}
count
}
}
#[derive(Clone, Debug)]
pub struct ServiceOperators {
pub operators: Vec<Address>,
pub index_map: HashMap<Address, usize>,
}
impl ServiceOperators {
pub fn new(operators: Vec<Address>) -> Self {
let index_map = operators
.iter()
.enumerate()
.map(|(i, addr)| (*addr, i))
.collect();
Self {
operators,
index_map,
}
}
pub fn index_of(&self, operator: &Address) -> Option<usize> {
self.index_map.get(operator).copied()
}
pub fn len(&self) -> usize {
self.operators.len()
}
pub fn is_empty(&self) -> bool {
self.operators.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &Address> {
self.operators.iter()
}
}
pub struct ServiceConfigCache {
ttl: Duration,
aggregation_configs: RwLock<HashMap<(u64, u8), CacheEntry<AggregationConfig>>>,
operator_weights: RwLock<HashMap<u64, CacheEntry<OperatorWeights>>>,
service_operators: RwLock<HashMap<u64, CacheEntry<ServiceOperators>>>,
operator_metadata: RwLock<HashMap<(u64, Address), CacheEntry<OperatorMetadata>>>,
}
impl ServiceConfigCache {
pub fn new(ttl: Duration) -> Self {
Self {
ttl,
aggregation_configs: RwLock::new(HashMap::new()),
operator_weights: RwLock::new(HashMap::new()),
service_operators: RwLock::new(HashMap::new()),
operator_metadata: RwLock::new(HashMap::new()),
}
}
pub fn with_default_ttl() -> Self {
Self::new(DEFAULT_CACHE_TTL)
}
pub fn ttl(&self) -> Duration {
self.ttl
}
pub fn set_ttl(&mut self, ttl: Duration) {
self.ttl = ttl;
}
pub async fn get_aggregation_config(
&self,
client: &TangleClient,
service_id: u64,
job_index: u8,
) -> Result<AggregationConfig, CacheError> {
let key = (service_id, job_index);
{
let cache = self
.aggregation_configs
.read()
.map_err(|_| CacheError::LockPoisoned)?;
if let Some(entry) = cache.get(&key) {
if !entry.is_expired(self.ttl) {
blueprint_core::trace!(
target: "service-config-cache",
"Cache hit for aggregation config: service={}, job={}",
service_id,
job_index
);
return Ok(entry.value.clone());
}
}
}
blueprint_core::debug!(
target: "service-config-cache",
"Cache miss for aggregation config: service={}, job={}, fetching from chain",
service_id,
job_index
);
let config = client
.get_aggregation_config(service_id, job_index)
.await
.map_err(|e| CacheError::FetchError(e.to_string()))?;
{
let mut cache = self
.aggregation_configs
.write()
.map_err(|_| CacheError::LockPoisoned)?;
cache.insert(key, CacheEntry::new(config.clone()));
}
Ok(config)
}
pub fn set_aggregation_config(
&self,
service_id: u64,
job_index: u8,
config: AggregationConfig,
) -> Result<(), CacheError> {
let mut cache = self
.aggregation_configs
.write()
.map_err(|_| CacheError::LockPoisoned)?;
cache.insert((service_id, job_index), CacheEntry::new(config));
Ok(())
}
pub async fn get_operator_weights(
&self,
client: &TangleClient,
service_id: u64,
) -> Result<OperatorWeights, CacheError> {
{
let cache = self
.operator_weights
.read()
.map_err(|_| CacheError::LockPoisoned)?;
if let Some(entry) = cache.get(&service_id) {
if !entry.is_expired(self.ttl) {
blueprint_core::trace!(
target: "service-config-cache",
"Cache hit for operator weights: service={}",
service_id
);
return Ok(entry.value.clone());
}
}
}
blueprint_core::debug!(
target: "service-config-cache",
"Cache miss for operator weights: service={}, fetching from chain",
service_id
);
let weights = self.fetch_operator_weights(client, service_id).await?;
{
let mut cache = self
.operator_weights
.write()
.map_err(|_| CacheError::LockPoisoned)?;
cache.insert(service_id, CacheEntry::new(weights.clone()));
}
Ok(weights)
}
async fn fetch_operator_weights(
&self,
client: &TangleClient,
service_id: u64,
) -> Result<OperatorWeights, CacheError> {
let operators = client
.get_service_operators(service_id)
.await
.map_err(|e| CacheError::FetchError(format!("Failed to get operators: {}", e)))?;
let mut weights = HashMap::new();
let mut total_exposure: u64 = 0;
for operator in operators {
match client.get_service_operator(service_id, operator).await {
Ok(op_info) => {
if op_info.active {
weights.insert(operator, op_info.exposureBps);
total_exposure += op_info.exposureBps as u64;
}
}
Err(e) => {
blueprint_core::warn!(
target: "service-config-cache",
"Failed to get operator info for {}: {}",
operator,
e
);
}
}
}
Ok(OperatorWeights {
weights,
total_exposure,
})
}
pub fn set_operator_weights(
&self,
service_id: u64,
weights: OperatorWeights,
) -> Result<(), CacheError> {
let mut cache = self
.operator_weights
.write()
.map_err(|_| CacheError::LockPoisoned)?;
cache.insert(service_id, CacheEntry::new(weights));
Ok(())
}
pub async fn get_service_operators(
&self,
client: &TangleClient,
service_id: u64,
) -> Result<ServiceOperators, CacheError> {
{
let cache = self
.service_operators
.read()
.map_err(|_| CacheError::LockPoisoned)?;
if let Some(entry) = cache.get(&service_id) {
if !entry.is_expired(self.ttl) {
blueprint_core::trace!(
target: "service-config-cache",
"Cache hit for service operators: service={}",
service_id
);
return Ok(entry.value.clone());
}
}
}
blueprint_core::debug!(
target: "service-config-cache",
"Cache miss for service operators: service={}, fetching from chain",
service_id
);
let operators_list = client
.get_service_operators(service_id)
.await
.map_err(|e| CacheError::FetchError(e.to_string()))?;
let operators = ServiceOperators::new(operators_list);
{
let mut cache = self
.service_operators
.write()
.map_err(|_| CacheError::LockPoisoned)?;
cache.insert(service_id, CacheEntry::new(operators.clone()));
}
Ok(operators)
}
pub async fn get_operator_metadata(
&self,
client: &TangleClient,
blueprint_id: u64,
operator: Address,
) -> Result<OperatorMetadata, CacheError> {
let key = (blueprint_id, operator);
if let Some(entry) = self
.operator_metadata
.read()
.map_err(|_| CacheError::LockPoisoned)?
.get(&key)
.cloned()
{
if !entry.is_expired(self.ttl) {
return Ok(entry.value);
}
}
let metadata = client
.get_operator_metadata(blueprint_id, operator)
.await
.map_err(|e| CacheError::FetchError(e.to_string()))?;
let mut guard = self
.operator_metadata
.write()
.map_err(|_| CacheError::LockPoisoned)?;
guard.insert(key, CacheEntry::new(metadata.clone()));
Ok(metadata)
}
pub async fn get_service_operator_metadata(
&self,
client: &TangleClient,
blueprint_id: u64,
service_id: u64,
) -> Result<HashMap<Address, OperatorMetadata>, CacheError> {
let operators = self.get_service_operators(client, service_id).await?;
let mut result = HashMap::with_capacity(operators.len());
for operator in operators.iter() {
let metadata = self
.get_operator_metadata(client, blueprint_id, *operator)
.await?;
result.insert(*operator, metadata);
}
Ok(result)
}
pub fn invalidate_service(&self, service_id: u64) {
blueprint_core::debug!(
target: "service-config-cache",
"Invalidating cache for service {}",
service_id
);
if let Ok(mut cache) = self.aggregation_configs.write() {
cache.retain(|(sid, _), _| *sid != service_id);
}
if let Ok(mut cache) = self.operator_weights.write() {
cache.remove(&service_id);
}
if let Ok(mut cache) = self.service_operators.write() {
cache.remove(&service_id);
}
}
pub fn invalidate_aggregation_config(&self, service_id: u64, job_index: u8) {
if let Ok(mut cache) = self.aggregation_configs.write() {
cache.remove(&(service_id, job_index));
}
}
pub fn clear(&self) {
blueprint_core::debug!(
target: "service-config-cache",
"Clearing all cached service configs"
);
if let Ok(mut cache) = self.aggregation_configs.write() {
cache.clear();
}
if let Ok(mut cache) = self.operator_weights.write() {
cache.clear();
}
if let Ok(mut cache) = self.service_operators.write() {
cache.clear();
}
}
pub fn cleanup_expired(&self) {
let ttl = self.ttl;
if let Ok(mut cache) = self.aggregation_configs.write() {
cache.retain(|_, entry| !entry.is_expired(ttl));
}
if let Ok(mut cache) = self.operator_weights.write() {
cache.retain(|_, entry| !entry.is_expired(ttl));
}
if let Ok(mut cache) = self.service_operators.write() {
cache.retain(|_, entry| !entry.is_expired(ttl));
}
}
pub fn stats(&self) -> CacheStats {
let aggregation_count = self
.aggregation_configs
.read()
.map(|c| c.len())
.unwrap_or(0);
let weights_count = self.operator_weights.read().map(|c| c.len()).unwrap_or(0);
let operators_count = self.service_operators.read().map(|c| c.len()).unwrap_or(0);
CacheStats {
aggregation_configs: aggregation_count,
operator_weights: weights_count,
service_operators: operators_count,
ttl: self.ttl,
}
}
}
impl Default for ServiceConfigCache {
fn default() -> Self {
Self::with_default_ttl()
}
}
#[derive(Clone, Debug)]
pub struct CacheStats {
pub aggregation_configs: usize,
pub operator_weights: usize,
pub service_operators: usize,
pub ttl: Duration,
}
impl fmt::Display for CacheStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ServiceConfigCache {{ aggregation_configs: {}, operator_weights: {}, service_operators: {}, ttl: {:?} }}",
self.aggregation_configs, self.operator_weights, self.service_operators, self.ttl
)
}
}
pub type SharedServiceConfigCache = Arc<ServiceConfigCache>;
pub fn shared_cache() -> SharedServiceConfigCache {
Arc::new(ServiceConfigCache::with_default_ttl())
}
pub fn shared_cache_with_ttl(ttl: Duration) -> SharedServiceConfigCache {
Arc::new(ServiceConfigCache::new(ttl))
}
#[derive(Debug, Clone)]
pub enum CacheInvalidationEvent {
OperatorJoined { service_id: u64, operator: Address },
OperatorLeft { service_id: u64, operator: Address },
ServiceTerminated { service_id: u64 },
ServiceActivated { service_id: u64 },
}
impl fmt::Display for CacheInvalidationEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OperatorJoined {
service_id,
operator,
} => {
write!(
f,
"OperatorJoined(service={}, operator={})",
service_id, operator
)
}
Self::OperatorLeft {
service_id,
operator,
} => {
write!(
f,
"OperatorLeft(service={}, operator={})",
service_id, operator
)
}
Self::ServiceTerminated { service_id } => {
write!(f, "ServiceTerminated(service={})", service_id)
}
Self::ServiceActivated { service_id } => {
write!(f, "ServiceActivated(service={})", service_id)
}
}
}
}
impl ServiceConfigCache {
pub fn handle_event(&self, event: &CacheInvalidationEvent) {
blueprint_core::info!(
target: "service-config-cache",
"⚡ Cache invalidation triggered by event: {}",
event
);
match event {
CacheInvalidationEvent::OperatorJoined {
service_id,
operator,
} => {
blueprint_core::info!(
target: "service-config-cache",
"🔄 Invalidating cache: operator {} joined service {}",
operator,
service_id
);
self.invalidate_operator_data(*service_id);
}
CacheInvalidationEvent::OperatorLeft {
service_id,
operator,
} => {
blueprint_core::info!(
target: "service-config-cache",
"🔄 Invalidating cache: operator {} left service {}",
operator,
service_id
);
self.invalidate_operator_data(*service_id);
}
CacheInvalidationEvent::ServiceTerminated { service_id } => {
blueprint_core::info!(
target: "service-config-cache",
"🗑️ Clearing all cache for terminated service {}",
service_id
);
self.invalidate_service(*service_id);
}
CacheInvalidationEvent::ServiceActivated { service_id } => {
blueprint_core::info!(
target: "service-config-cache",
"✨ Service {} activated (cache will be populated on first access)",
service_id
);
}
}
}
fn invalidate_operator_data(&self, service_id: u64) {
if let Ok(mut cache) = self.operator_weights.write() {
cache.remove(&service_id);
}
if let Ok(mut cache) = self.service_operators.write() {
cache.remove(&service_id);
}
}
}
pub struct CacheSyncService {
client: Arc<TangleClient>,
cache: SharedServiceConfigCache,
watched_services: Option<Vec<u64>>,
last_block: AtomicU64,
}
impl CacheSyncService {
pub fn new(client: Arc<TangleClient>, cache: SharedServiceConfigCache) -> Self {
Self {
client,
cache,
watched_services: None,
last_block: AtomicU64::new(0),
}
}
pub fn with_services(mut self, services: Vec<u64>) -> Self {
self.watched_services = Some(services);
self
}
pub fn from_block(self, block: u64) -> Self {
self.last_block.store(block, Ordering::Relaxed);
self
}
fn should_watch(&self, service_id: u64) -> bool {
self.watched_services
.as_ref()
.map(|s| s.contains(&service_id))
.unwrap_or(true)
}
pub async fn poll_and_sync(&self) -> Result<usize, CacheError> {
use alloy_rpc_types::Filter;
use blueprint_client_tangle::contracts::ITangle;
let from_block = self.last_block.load(Ordering::Relaxed);
let tangle_address = self.client.config.settings.tangle_contract;
let filter = Filter::new()
.address(tangle_address)
.from_block(from_block)
.events([
<ITangle::OperatorJoinedService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
<ITangle::OperatorLeftService as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
<ITangle::ServiceTerminated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
<ITangle::ServiceActivated as alloy_sol_types::SolEvent>::SIGNATURE_HASH,
]);
let logs = self
.client
.get_logs(&filter)
.await
.map_err(|e| CacheError::FetchError(format!("Failed to fetch logs: {}", e)))?;
let count = self.process_logs(&logs);
if let Some(last_log) = logs.last() {
if let Some(block_num) = last_log.block_number {
self.last_block.store(block_num + 1, Ordering::Relaxed);
}
}
Ok(count)
}
pub fn process_logs(&self, logs: &[alloy_rpc_types::Log]) -> usize {
let mut count = 0;
for log in logs {
if let Some(event) = self.parse_log(log) {
let service_id = match &event {
CacheInvalidationEvent::OperatorJoined { service_id, .. } => *service_id,
CacheInvalidationEvent::OperatorLeft { service_id, .. } => *service_id,
CacheInvalidationEvent::ServiceTerminated { service_id } => *service_id,
CacheInvalidationEvent::ServiceActivated { service_id } => *service_id,
};
if self.should_watch(service_id) {
self.cache.handle_event(&event);
count += 1;
}
}
}
count
}
pub fn parse_log(&self, log: &alloy_rpc_types::Log) -> Option<CacheInvalidationEvent> {
use blueprint_client_tangle::contracts::ITangle;
if let Ok(event) = log.log_decode::<ITangle::OperatorJoinedService>() {
return Some(CacheInvalidationEvent::OperatorJoined {
service_id: event.inner.serviceId,
operator: event.inner.operator,
});
}
if let Ok(event) = log.log_decode::<ITangle::OperatorLeftService>() {
return Some(CacheInvalidationEvent::OperatorLeft {
service_id: event.inner.serviceId,
operator: event.inner.operator,
});
}
if let Ok(event) = log.log_decode::<ITangle::ServiceTerminated>() {
return Some(CacheInvalidationEvent::ServiceTerminated {
service_id: event.inner.serviceId,
});
}
if let Ok(event) = log.log_decode::<ITangle::ServiceActivated>() {
return Some(CacheInvalidationEvent::ServiceActivated {
service_id: event.inner.serviceId,
});
}
None
}
pub fn process_event(&self, event: CacheInvalidationEvent) {
self.cache.handle_event(&event);
}
pub fn last_block(&self) -> u64 {
self.last_block.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_entry_expiration() {
let entry = CacheEntry::new(42);
assert!(!entry.is_expired(Duration::from_secs(1)));
assert!(entry.is_expired(Duration::ZERO));
}
#[test]
fn test_operator_weights_threshold_calculation() {
let mut weights = HashMap::new();
weights.insert(Address::ZERO, 5000);
weights.insert(Address::repeat_byte(1), 3000);
weights.insert(Address::repeat_byte(2), 2000);
let op_weights = OperatorWeights {
weights,
total_exposure: 10000,
};
assert_eq!(op_weights.calculate_threshold_signers(5000), 1);
assert_eq!(op_weights.calculate_threshold_signers(6700), 2);
assert_eq!(op_weights.calculate_threshold_signers(10000), 3);
}
#[test]
fn test_service_operators_index() {
let ops = vec![
Address::repeat_byte(1),
Address::repeat_byte(2),
Address::repeat_byte(3),
];
let service_ops = ServiceOperators::new(ops);
assert_eq!(service_ops.index_of(&Address::repeat_byte(1)), Some(0));
assert_eq!(service_ops.index_of(&Address::repeat_byte(2)), Some(1));
assert_eq!(service_ops.index_of(&Address::repeat_byte(3)), Some(2));
assert_eq!(service_ops.index_of(&Address::repeat_byte(4)), None);
}
#[test]
fn test_cache_stats() {
let cache = ServiceConfigCache::with_default_ttl();
let stats = cache.stats();
assert_eq!(stats.aggregation_configs, 0);
assert_eq!(stats.operator_weights, 0);
assert_eq!(stats.service_operators, 0);
assert_eq!(stats.ttl, DEFAULT_CACHE_TTL);
}
#[test]
fn test_cache_invalidation_event_display() {
let event = CacheInvalidationEvent::OperatorJoined {
service_id: 1,
operator: Address::repeat_byte(0xAB),
};
assert!(event.to_string().contains("OperatorJoined"));
assert!(event.to_string().contains("service=1"));
let event = CacheInvalidationEvent::OperatorLeft {
service_id: 2,
operator: Address::repeat_byte(0xCD),
};
assert!(event.to_string().contains("OperatorLeft"));
let event = CacheInvalidationEvent::ServiceTerminated { service_id: 3 };
assert!(event.to_string().contains("ServiceTerminated"));
let event = CacheInvalidationEvent::ServiceActivated { service_id: 4 };
assert!(event.to_string().contains("ServiceActivated"));
}
#[test]
fn test_handle_operator_joined_invalidates_cache() {
let cache = ServiceConfigCache::with_default_ttl();
let mut weights = HashMap::new();
weights.insert(Address::ZERO, 5000u16);
cache
.set_operator_weights(
1,
OperatorWeights {
weights,
total_exposure: 5000,
},
)
.unwrap();
assert_eq!(cache.stats().operator_weights, 1);
cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
service_id: 1,
operator: Address::repeat_byte(1),
});
assert_eq!(cache.stats().operator_weights, 0);
}
#[test]
fn test_handle_operator_left_invalidates_cache() {
let cache = ServiceConfigCache::with_default_ttl();
let mut weights = HashMap::new();
weights.insert(Address::ZERO, 5000u16);
cache
.set_operator_weights(
1,
OperatorWeights {
weights,
total_exposure: 5000,
},
)
.unwrap();
assert_eq!(cache.stats().operator_weights, 1);
cache.handle_event(&CacheInvalidationEvent::OperatorLeft {
service_id: 1,
operator: Address::ZERO,
});
assert_eq!(cache.stats().operator_weights, 0);
}
#[test]
fn test_handle_service_terminated_clears_all() {
let cache = ServiceConfigCache::with_default_ttl();
let mut weights = HashMap::new();
weights.insert(Address::ZERO, 5000u16);
cache
.set_operator_weights(
1,
OperatorWeights {
weights: weights.clone(),
total_exposure: 5000,
},
)
.unwrap();
cache
.set_operator_weights(
2,
OperatorWeights {
weights,
total_exposure: 5000,
},
)
.unwrap();
assert_eq!(cache.stats().operator_weights, 2);
cache.handle_event(&CacheInvalidationEvent::ServiceTerminated { service_id: 1 });
assert_eq!(cache.stats().operator_weights, 1);
}
#[test]
fn test_handle_service_activated_no_invalidation() {
let cache = ServiceConfigCache::with_default_ttl();
let mut weights = HashMap::new();
weights.insert(Address::ZERO, 5000u16);
cache
.set_operator_weights(
1,
OperatorWeights {
weights,
total_exposure: 5000,
},
)
.unwrap();
assert_eq!(cache.stats().operator_weights, 1);
cache.handle_event(&CacheInvalidationEvent::ServiceActivated { service_id: 1 });
assert_eq!(cache.stats().operator_weights, 1);
}
#[test]
fn test_invalidation_only_affects_target_service() {
let cache = ServiceConfigCache::with_default_ttl();
for service_id in 1..=3 {
let mut weights = HashMap::new();
weights.insert(Address::repeat_byte(service_id as u8), 5000u16);
cache
.set_operator_weights(
service_id,
OperatorWeights {
weights,
total_exposure: 5000,
},
)
.unwrap();
}
assert_eq!(cache.stats().operator_weights, 3);
cache.handle_event(&CacheInvalidationEvent::OperatorJoined {
service_id: 2,
operator: Address::repeat_byte(0xFF),
});
assert_eq!(cache.stats().operator_weights, 2);
}
}