use crate::nft::errors::{NftError, NftResult};
use crate::nft::types::*;
use crate::nft::valuation::ValuationResult;
use crate::nft::security::SecurityValidationResult;
use dashmap::DashMap;
use moka::future::Cache as MokaCache;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Clone)]
pub struct CacheManager {
l1_cache: Arc<MokaCache<CacheKey, CacheEntry>>,
l2_cache: Arc<DashMap<CacheKey, CacheEntry>>,
config: CacheConfig,
metrics: Arc<CacheMetrics>,
compression: Arc<CompressionEngine>,
eviction_manager: Arc<EvictionManager>,
cache_warmer: Arc<CacheWarmer>,
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub l1_max_entries: u64,
pub l2_max_entries: u64,
pub l1_ttl_seconds: u64,
pub l2_ttl_seconds: u64,
pub enable_compression: bool,
pub compression_threshold_bytes: usize,
pub enable_intelligent_eviction: bool,
pub enable_cache_warming: bool,
pub enable_metrics: bool,
pub cleanup_interval_seconds: u64,
pub max_memory_mb: u64,
pub hit_ratio_target: f64,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
l1_max_entries: 10000,
l2_max_entries: 50000,
l1_ttl_seconds: 300, l2_ttl_seconds: 1800, enable_compression: true,
compression_threshold_bytes: 1024, enable_intelligent_eviction: true,
enable_cache_warming: true,
enable_metrics: true,
cleanup_interval_seconds: 60, max_memory_mb: 512, hit_ratio_target: 0.8,
}
}
}
#[derive(Debug, Default)]
pub struct CacheMetrics {
pub total_requests: Arc<std::sync::atomic::AtomicU64>,
pub l1_hits: Arc<std::sync::atomic::AtomicU64>,
pub l2_hits: Arc<std::sync::atomic::AtomicU64>,
pub misses: Arc<std::sync::atomic::AtomicU64>,
pub evictions: Arc<std::sync::atomic::AtomicU64>,
pub compressions: Arc<std::sync::atomic::AtomicU64>,
pub decompressions: Arc<std::sync::atomic::AtomicU64>,
pub memory_usage_bytes: Arc<std::sync::atomic::AtomicU64>,
pub avg_access_time_us: Arc<std::sync::atomic::AtomicU64>,
pub cache_size: Arc<std::sync::atomic::AtomicU64>,
pub hit_ratio: Arc<std::sync::atomic::AtomicF64>,
pub metrics_by_type: Arc<DashMap<CacheEntryType, TypeMetrics>>,
}
#[derive(Debug, Default)]
pub struct TypeMetrics {
pub requests: u64,
pub hits: u64,
pub misses: u64,
pub avg_size_bytes: f64,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct CacheKey {
pub key_type: CacheKeyType,
pub value: String,
pub namespace: Option<String>,
pub version: Option<u32>,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub enum CacheKeyType {
NftMetadata,
NftValuation,
SecurityValidation,
CollectionData,
MarketData,
ImageData,
Custom { type_name: String },
}
#[derive(Debug, Clone)]
pub struct CacheEntry {
pub entry_type: CacheEntryType,
pub data: CacheEntryData,
pub created_at: chrono::DateTime<chrono::Utc>,
pub last_accessed: Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
pub access_count: Arc<std::sync::atomic::AtomicU64>,
pub size_bytes: usize,
pub compressed: bool,
pub priority_score: Arc<RwLock<f64>>,
pub ttl_seconds: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CacheEntryType {
NftInfo,
ValuationResult,
SecurityValidationResult,
CollectionInfo,
MarketData,
ImageData,
Custom { type_name: String },
}
#[derive(Debug, Clone)]
pub enum CacheEntryData {
NftInfo(NftInfo),
ValuationResult(ValuationResult),
SecurityValidationResult(SecurityValidationResult),
CollectionInfo(CollectionInfo),
MarketData(crate::nft::valuation::MarketData),
ImageData(Vec<u8>),
JsonData(serde_json::Value),
RawBytes(Vec<u8>),
Custom { type_name: String, data: Vec<u8> },
}
#[derive(Clone)]
pub struct CompressionEngine {
enabled: bool,
threshold_bytes: usize,
algorithm: CompressionAlgorithm,
}
#[derive(Debug, Clone)]
pub enum CompressionAlgorithm {
Gzip,
Lz4,
Zstd,
}
#[derive(Clone)]
pub struct EvictionManager {
policy: EvictionPolicy,
stats: Arc<EvictionStats>,
}
#[derive(Debug, Clone)]
pub enum EvictionPolicy {
LRU,
LFU,
TimeBased,
SizeBased,
Adaptive { weights: HashMap<EvictionFactor, f64> },
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum EvictionFactor {
AccessFrequency,
Recency,
Size,
Age,
Priority,
}
#[derive(Debug, Default)]
pub struct EvictionStats {
pub total_evictions: Arc<std::sync::atomic::AtomicU64>,
pub evictions_by_reason: Arc<DashMap<EvictionReason, u64>>,
pub evictions_by_type: Arc<DashMap<CacheEntryType, u64>>,
pub memory_recovered_bytes: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum EvictionReason {
Expired,
SizeLimit,
MemoryLimit,
LowPriority,
Manual,
}
#[derive(Clone)]
pub struct CacheWarmer {
config: CacheWarmingConfig,
stats: Arc<WarmingStats>,
}
#[derive(Debug, Clone)]
pub struct CacheWarmingConfig {
pub enabled: bool,
pub interval_seconds: u64,
pub max_items_per_interval: usize,
pub strategies: Vec<WarmingStrategy>,
}
#[derive(Debug, Clone)]
pub enum WarmingStrategy {
PopularItems { threshold: u64 },
RecentlyAccessed { time_window_minutes: u64 },
HighValueItems { min_value_lamports: u64 },
VerifiedCollections,
Custom { strategy_name: String, config: serde_json::Value },
}
#[derive(Debug, Default)]
pub struct WarmingStats {
pub items_warmed: Arc<std::sync::atomic::AtomicU64>,
pub warming_successes: Arc<std::sync::atomic::AtomicU64>,
pub warming_failures: Arc<std::sync::atomic::AtomicU64>,
pub warming_time_ms: Arc<std::sync::atomic::AtomicU64>,
}
impl CacheManager {
pub fn new(config: CacheConfig) -> Self {
let metrics = Arc::new(CacheMetrics::default());
let compression = Arc::new(CompressionEngine::new(
config.enable_compression,
config.compression_threshold_bytes,
));
let eviction_manager = Arc::new(EvictionManager::new(EvictionPolicy::Adaptive {
weights: [
(EvictionFactor::AccessFrequency, 0.3),
(EvictionFactor::Recency, 0.3),
(EvictionFactor::Size, 0.2),
(EvictionFactor::Age, 0.1),
(EvictionFactor::Priority, 0.1),
].into_iter().collect(),
}));
let cache_warmer = Arc::new(CacheWarmer::new(CacheWarmingConfig::default()));
let l1_cache = Arc::new(
MokaCache::builder()
.max_capacity(config.l1_max_entries)
.time_to_live(Duration::from_secs(config.l1_ttl_seconds))
.build()
);
let l2_cache = Arc::new(DashMap::new());
let cache_manager = Self {
l1_cache,
l2_cache,
metrics,
compression,
eviction_manager,
cache_warmer,
config,
};
if cache_manager.config.enable_intelligent_eviction {
cache_manager.start_eviction_task();
}
if cache_manager.config.enable_cache_warming {
cache_manager.start_warming_task();
}
cache_manager
}
pub async fn get_nft(&self, key: &CacheKey) -> Option<NftInfo> {
let start_time = Instant::now();
self.metrics.total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(entry) = self.l1_cache.get(key).await {
self.metrics.l1_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::NftInfo(nft_info) => return Some(nft_info.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
if let Some(entry) = self.l2_cache.get(key) {
self.metrics.l2_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
self.l1_cache.insert(key.clone(), entry.clone()).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::NftInfo(nft_info) => return Some(nft_info.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
self.metrics.misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_type_metrics(CacheEntryType::NftInfo, false).await;
None
}
pub async fn set_nft(&self, key: &CacheKey, nft_info: &NftInfo) {
let serialized_size = serde_json::to_vec(nft_info).unwrap_or_default().len();
let entry = CacheEntry {
entry_type: CacheEntryType::NftInfo,
data: CacheEntryData::NftInfo(nft_info.clone()),
created_at: chrono::Utc::now(),
last_accessed: Arc::new(RwLock::new(chrono::Utc::now())),
access_count: Arc::new(std::sync::atomic::AtomicU64::new(1)),
size_bytes: serialized_size,
compressed: false,
priority_score: Arc::new(RwLock::new(1.0)),
ttl_seconds: self.config.l1_ttl_seconds,
};
self.l1_cache.insert(key.clone(), entry.clone()).await;
self.l2_cache.insert(key.clone(), entry);
self.metrics.cache_size.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.metrics.memory_usage_bytes.fetch_add(
serialized_size as u64,
std::sync::atomic::Ordering::Relaxed
);
self.update_type_metrics(CacheEntryType::NftInfo, true).await;
}
pub async fn get_valuation(&self, key: &CacheKey) -> Option<ValuationResult> {
let start_time = Instant::now();
self.metrics.total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(entry) = self.l1_cache.get(key).await {
self.metrics.l1_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::ValuationResult(result) => return Some(result.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
if let Some(entry) = self.l2_cache.get(key) {
self.metrics.l2_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
self.l1_cache.insert(key.clone(), entry.clone()).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::ValuationResult(result) => return Some(result.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
self.metrics.misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_type_metrics(CacheEntryType::ValuationResult, false).await;
None
}
pub async fn set_valuation(&self, key: &CacheKey, valuation: &ValuationResult) {
let serialized_size = serde_json::to_vec(valuation).unwrap_or_default().len();
let entry = CacheEntry {
entry_type: CacheEntryType::ValuationResult,
data: CacheEntryData::ValuationResult(valuation.clone()),
created_at: chrono::Utc::now(),
last_accessed: Arc::new(RwLock::new(chrono::Utc::now())),
access_count: Arc::new(std::sync::atomic::AtomicU64::new(1)),
size_bytes: serialized_size,
compressed: false,
priority_score: Arc::new(RwLock::new(1.0)),
ttl_seconds: self.config.l1_ttl_seconds,
};
self.l1_cache.insert(key.clone(), entry.clone()).await;
self.l2_cache.insert(key.clone(), entry);
self.metrics.cache_size.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.metrics.memory_usage_bytes.fetch_add(
serialized_size as u64,
std::sync::atomic::Ordering::Relaxed
);
self.update_type_metrics(CacheEntryType::ValuationResult, true).await;
}
pub async fn get_security_validation(&self, key: &CacheKey) -> Option<SecurityValidationResult> {
let start_time = Instant::now();
self.metrics.total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(entry) = self.l1_cache.get(key).await {
self.metrics.l1_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::SecurityValidationResult(result) => return Some(result.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
if let Some(entry) = self.l2_cache.get(key) {
self.metrics.l2_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_access_stats(&entry).await;
self.l1_cache.insert(key.clone(), entry.clone()).await;
let access_time_us = start_time.elapsed().as_micros() as u64;
self.metrics.avg_access_time_us.fetch_add(access_time_us, std::sync::atomic::Ordering::Relaxed);
match &entry.data {
CacheEntryData::SecurityValidationResult(result) => return Some(result.clone()),
_ => warn!("Cache entry type mismatch for key {:?}", key),
}
}
self.metrics.misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.update_type_metrics(CacheEntryType::SecurityValidationResult, false).await;
None
}
pub async fn set_security_validation(&self, key: &CacheKey, validation: &SecurityValidationResult) {
let serialized_size = serde_json::to_vec(validation).unwrap_or_default().len();
let entry = CacheEntry {
entry_type: CacheEntryType::SecurityValidationResult,
data: CacheEntryData::SecurityValidationResult(validation.clone()),
created_at: chrono::Utc::now(),
last_accessed: Arc::new(RwLock::new(chrono::Utc::now())),
access_count: Arc::new(std::sync::atomic::AtomicU64::new(1)),
size_bytes: serialized_size,
compressed: false,
priority_score: Arc::new(RwLock::new(1.0)),
ttl_seconds: self.config.l1_ttl_seconds,
};
self.l1_cache.insert(key.clone(), entry.clone()).await;
self.l2_cache.insert(key.clone(), entry);
self.metrics.cache_size.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.metrics.memory_usage_bytes.fetch_add(
serialized_size as u64,
std::sync::atomic::Ordering::Relaxed
);
self.update_type_metrics(CacheEntryType::SecurityValidationResult, true).await;
}
pub async fn invalidate(&self, key: &CacheKey) {
self.l1_cache.remove(key).await;
self.l2_cache.remove(key);
self.metrics.cache_size.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
pub async fn clear(&self) {
self.l1_cache.invalidate_all();
self.l2_cache.clear();
self.metrics.cache_size.store(0, std::sync::atomic::Ordering::Relaxed);
self.metrics.memory_usage_bytes.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub async fn get_stats(&self) -> CacheStats {
let total_requests = self.metrics.total_requests.load(std::sync::atomic::Ordering::Relaxed);
let total_hits = self.metrics.l1_hits.load(std::sync::atomic::Ordering::Relaxed) +
self.metrics.l2_hits.load(std::sync::atomic::Ordering::Relaxed);
let hit_ratio = if total_requests > 0 {
total_hits as f64 / total_requests as f64
} else {
0.0
};
CacheStats {
total_requests,
l1_hits: self.metrics.l1_hits.load(std::sync::atomic::Ordering::Relaxed),
l2_hits: self.metrics.l2_hits.load(std::sync::atomic::Ordering::Relaxed),
misses: self.metrics.misses.load(std::sync::atomic::Ordering::Relaxed),
hit_ratio,
cache_size: self.metrics.cache_size.load(std::sync::atomic::Ordering::Relaxed),
memory_usage_mb: self.metrics.memory_usage_bytes.load(std::sync::atomic::Ordering::Relaxed) / 1024 / 1024,
avg_access_time_us: self.metrics.avg_access_time_us.load(std::sync::atomic::Ordering::Relaxed),
evictions: self.metrics.evictions.load(std::sync::atomic::Ordering::Relaxed),
compressions: self.metrics.compressions.load(std::sync::atomic::Ordering::Relaxed),
decompressions: self.metrics.decompressions.load(std::sync::atomic::Ordering::Relaxed),
}
}
async fn update_access_stats(&self, entry: &CacheEntry) {
let now = chrono::Utc::now();
*entry.last_accessed.write().await = now;
entry.access_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let _current_score = *entry.priority_score.read().await;
let access_count = entry.access_count.load(std::sync::atomic::Ordering::Relaxed) as f64;
let age_hours = (now - entry.created_at).num_hours() as f64;
let new_score = (access_count / (age_hours + 1.0)).min(100.0);
*entry.priority_score.write().await = new_score;
}
async fn update_type_metrics(&self, entry_type: CacheEntryType, is_hit: bool) {
if !self.config.enable_metrics {
return;
}
let mut type_metrics = self.metrics.metrics_by_type
.entry(entry_type)
.or_insert_with(TypeMetrics::default);
type_metrics.requests += 1;
if is_hit {
type_metrics.hits += 1;
} else {
type_metrics.misses += 1;
}
}
fn start_eviction_task(&self) {
info!("Eviction task started");
}
fn start_warming_task(&self) {
info!("Cache warming task started");
}
pub fn get_metrics(&self) -> &CacheMetrics {
&self.metrics
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheStats {
pub total_requests: u64,
pub l1_hits: u64,
pub l2_hits: u64,
pub misses: u64,
pub hit_ratio: f64,
pub cache_size: u64,
pub memory_usage_mb: u64,
pub avg_access_time_us: u64,
pub evictions: u64,
pub compressions: u64,
pub decompressions: u64,
}
impl CompressionEngine {
pub fn new(enabled: bool, threshold_bytes: usize) -> Self {
Self {
enabled,
threshold_bytes,
algorithm: CompressionAlgorithm::Gzip,
}
}
pub fn compress(&self, data: &[u8]) -> NftResult<Vec<u8>> {
if !self.enabled || data.len() < self.threshold_bytes {
return Ok(data.to_vec());
}
match self.algorithm {
CompressionAlgorithm::Gzip => {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data)
.map_err(|e| NftError::Serialization {
message: format!("Compression failed: {}", e),
format: Some("gzip".to_string()),
data_type: Some("bytes".to_string()),
})?;
encoder.finish()
.map_err(|e| NftError::Serialization {
message: format!("Compression finish failed: {}", e),
format: Some("gzip".to_string()),
data_type: Some("bytes".to_string()),
})
}
CompressionAlgorithm::Lz4 => {
Ok(data.to_vec())
}
CompressionAlgorithm::Zstd => {
Ok(data.to_vec())
}
}
}
pub fn decompress(&self, compressed_data: &[u8]) -> NftResult<Vec<u8>> {
if !self.enabled {
return Ok(compressed_data.to_vec());
}
match self.algorithm {
CompressionAlgorithm::Gzip => {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(compressed_data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)
.map_err(|e| NftError::Serialization {
message: format!("Decompression failed: {}", e),
format: Some("gzip".to_string()),
data_type: Some("bytes".to_string()),
})?;
Ok(decompressed)
}
CompressionAlgorithm::Lz4 => {
Ok(compressed_data.to_vec())
}
CompressionAlgorithm::Zstd => {
Ok(compressed_data.to_vec())
}
}
}
}
impl EvictionManager {
pub fn new(policy: EvictionPolicy) -> Self {
Self {
policy,
stats: Arc::new(EvictionStats::default()),
}
}
}
impl CacheWarmer {
pub fn new(config: CacheWarmingConfig) -> Self {
Self {
config,
stats: Arc::new(WarmingStats::default()),
}
}
}
impl Default for CacheWarmingConfig {
fn default() -> Self {
Self {
enabled: false,
interval_seconds: 300, max_items_per_interval: 100,
strategies: vec![
WarmingStrategy::PopularItems { threshold: 10 },
WarmingStrategy::RecentlyAccessed { time_window_minutes: 60 },
],
}
}
}
impl CacheKey {
pub fn metadata(mint_address: &str) -> Self {
Self {
key_type: CacheKeyType::NftMetadata,
value: mint_address.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn valuation(mint_address: &str) -> Self {
Self {
key_type: CacheKeyType::NftValuation,
value: mint_address.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn security(mint_address: &str) -> Self {
Self {
key_type: CacheKeyType::SecurityValidation,
value: mint_address.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn collection(collection_id: &str) -> Self {
Self {
key_type: CacheKeyType::CollectionData,
value: collection_id.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn market_data(collection_id: &str) -> Self {
Self {
key_type: CacheKeyType::MarketData,
value: collection_id.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn image(image_uri: &str) -> Self {
Self {
key_type: CacheKeyType::ImageData,
value: image_uri.to_string(),
namespace: None,
version: Some(1),
}
}
pub fn custom(type_name: &str, value: &str) -> Self {
Self {
key_type: CacheKeyType::Custom { type_name: type_name.to_string() },
value: value.to_string(),
namespace: None,
version: Some(1),
}
}
}