use alloy_chains::NamedChain;
use async_trait::async_trait;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use super::{types::TimestampMillis, BlockWindowCache, CacheKey, CacheStats};
use crate::blocks::window::DailyBlockWindow;
use crate::errors::BlockWindowError;
const CACHE_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CacheEntry {
window: DailyBlockWindow,
#[serde(default)]
created_at: TimestampMillis,
}
impl CacheEntry {
fn new(window: DailyBlockWindow) -> Self {
Self {
window,
created_at: TimestampMillis::now(),
}
}
fn is_expired(&self, ttl: Option<Duration>) -> bool {
if let Some(ttl) = ttl {
return self.created_at.is_older_than(ttl);
}
false
}
}
#[derive(Debug, Serialize, Deserialize)]
struct CacheData {
version: u32,
#[serde(
serialize_with = "serialize_cache_entries",
deserialize_with = "deserialize_cache_entries"
)]
entries: HashMap<CacheKey, CacheEntry>,
}
fn serialize_cache_entries<S>(
entries: &HashMap<CacheKey, CacheEntry>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::Serialize;
let string_map: HashMap<String, &CacheEntry> =
entries.iter().map(|(k, v)| (k.to_string(), v)).collect();
string_map.serialize(serializer)
}
fn deserialize_cache_entries<'de, D>(
deserializer: D,
) -> Result<HashMap<CacheKey, CacheEntry>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let string_map: HashMap<String, CacheEntry> = HashMap::deserialize(deserializer)?;
string_map
.into_iter()
.map(|(k, v)| {
let parts: Vec<&str> = k.split(':').collect();
if parts.len() != 2 {
return Err(serde::de::Error::custom(format!(
"Invalid cache key format: {}",
k
)));
}
let chain_id: u64 = parts[0].parse().map_err(|e| {
serde::de::Error::custom(format!("Invalid chain ID in key '{}': {}", k, e))
})?;
let chain = NamedChain::try_from(chain_id)
.map_err(|_| serde::de::Error::custom(format!("Unknown chain ID: {}", chain_id)))?;
let date = NaiveDate::parse_from_str(parts[1], "%Y-%m-%d").map_err(|e| {
serde::de::Error::custom(format!("Invalid date in key '{}': {}", k, e))
})?;
Ok((CacheKey::new(chain, date), v))
})
.collect()
}
impl Default for CacheData {
fn default() -> Self {
Self {
version: CACHE_VERSION,
entries: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Default)]
struct DiskCacheConfig {
max_entries: Option<usize>,
ttl: Option<Duration>,
}
#[derive(Debug, Default)]
struct DiskCacheState {
stats: CacheStats,
}
#[derive(Debug)]
pub struct DiskCache {
path: PathBuf,
config: DiskCacheConfig,
state: Mutex<DiskCacheState>,
}
impl DiskCache {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
path: path.into(),
config: DiskCacheConfig::default(),
state: Mutex::new(DiskCacheState::default()),
}
}
pub fn with_max_entries(mut self, max_entries: usize) -> Self {
self.config.max_entries = Some(max_entries);
self
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.config.ttl = Some(ttl);
self
}
pub fn validate(self) -> Result<Self, BlockWindowError> {
let parent = self.path.parent().ok_or_else(|| {
BlockWindowError::cache_io_error(
self.path.display().to_string(),
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Cache path has no parent directory",
),
)
})?;
if !parent.exists() {
std::fs::create_dir_all(parent).map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to create cache directory '{}': {}",
parent.display(),
e
),
e,
)
})?;
debug!(path = %parent.display(), "Created cache directory");
}
let test_file = parent.join(".cache_write_test");
std::fs::write(&test_file, b"test").map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Cache directory '{}' is not writable: {}",
parent.display(),
e
),
e,
)
})?;
let _ = std::fs::remove_file(&test_file);
debug!(path = %self.path.display(), "Cache path validated successfully");
Ok(self)
}
async fn load(&self) -> Result<CacheData, BlockWindowError> {
if !self.path.exists() {
debug!(path = %self.path.display(), "Cache file does not exist, using empty cache");
return Ok(CacheData::default());
}
let file = File::open(&self.path).map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to open cache file '{}': {}. Ensure the file is readable.",
self.path.display(),
e
),
e,
)
})?;
file.lock_shared().map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to acquire read lock on cache file '{}': {}",
self.path.display(),
e
),
e,
)
})?;
let data: CacheData = serde_json::from_reader(&file).map_err(|e| {
warn!(
path = %self.path.display(),
error = %e,
"Failed to parse cache file, using empty cache"
);
BlockWindowError::serialization_error(e)
})?;
if data.version != CACHE_VERSION {
warn!(
path = %self.path.display(),
cached_version = data.version,
current_version = CACHE_VERSION,
"Cache version mismatch, ignoring cached data"
);
drop(file);
return Ok(CacheData::default());
}
drop(file);
info!(
path = %self.path.display(),
entries = data.entries.len(),
version = data.version,
"Loaded block window cache"
);
Ok(data)
}
async fn save(&self, data: &CacheData) -> Result<(), BlockWindowError> {
let json =
serde_json::to_vec_pretty(data).map_err(BlockWindowError::serialization_error)?;
if let Some(parent) = self.path.parent() {
if !parent.exists() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to create cache directory '{}': {}. Ensure you have write permissions.",
parent.display(),
e
),
e,
)
})?;
}
}
let temp_path = self.path.with_extension("tmp");
tokio::fs::write(&temp_path, &json).await.map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to write cache to '{}': {}. Ensure the parent directory is writable.",
temp_path.display(),
e
),
e,
)
})?;
let file = File::open(&temp_path).map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to open temp cache file '{}': {}",
temp_path.display(),
e
),
e,
)
})?;
file.lock().map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to acquire write lock on cache file '{}': {}",
temp_path.display(),
e
),
e,
)
})?;
tokio::fs::rename(&temp_path, &self.path)
.await
.map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to rename cache file from '{}' to '{}': {}",
temp_path.display(),
self.path.display(),
e
),
e,
)
})?;
drop(file);
debug!(
path = %self.path.display(),
entries = data.entries.len(),
"Saved block window cache"
);
Ok(())
}
fn evict_oldest(data: &mut CacheData, max_entries: usize) -> usize {
let mut evicted = 0;
while data.entries.len() > max_entries {
let oldest_key = data
.entries
.iter()
.min_by(|(key_a, entry_a), (key_b, entry_b)| {
entry_a
.created_at
.cmp(&entry_b.created_at)
.then_with(|| key_a.to_string().cmp(&key_b.to_string()))
})
.map(|(key, _)| key.clone());
if let Some(key) = oldest_key {
debug!(key = %key, "Evicting oldest cache entry");
data.entries.remove(&key);
evicted += 1;
} else {
break;
}
}
evicted
}
}
#[async_trait]
impl BlockWindowCache for DiskCache {
async fn get(&self, key: &CacheKey) -> Option<DailyBlockWindow> {
let mut state = self.state.lock().await;
let data = match self.load().await {
Ok(data) => data,
Err(e) => {
warn!(error = %e, "Failed to load cache, treating as miss");
state.stats.misses += 1;
return None;
}
};
if let Some(entry) = data.entries.get(key) {
if entry.is_expired(self.config.ttl) {
debug!(key = %key, "Cache entry expired");
state.stats.expirations += 1;
state.stats.misses += 1;
return None;
}
state.stats.hits += 1;
debug!(key = %key, "Cache hit (disk)");
Some(entry.window.clone())
} else {
state.stats.misses += 1;
debug!(key = %key, "Cache miss (disk)");
None
}
}
async fn insert(
&self,
key: CacheKey,
window: DailyBlockWindow,
) -> Result<(), BlockWindowError> {
let mut state = self.state.lock().await;
let mut data = self.load().await.unwrap_or_default();
debug!(key = %key, "Inserting entry into disk cache");
data.entries.insert(key, CacheEntry::new(window));
if let Some(max_entries) = self.config.max_entries {
let evicted = Self::evict_oldest(&mut data, max_entries);
if evicted > 0 {
state.stats.evictions += evicted as u64;
}
}
state.stats.entries = data.entries.len();
self.save(&data).await?;
Ok(())
}
async fn clear(&self) -> Result<(), BlockWindowError> {
let mut state = self.state.lock().await;
debug!(path = %self.path.display(), "Clearing disk cache");
if self.path.exists() {
tokio::fs::remove_file(&self.path).await.map_err(|e| {
BlockWindowError::cache_io_error(
format!(
"Failed to delete cache file '{}': {}",
self.path.display(),
e
),
e,
)
})?;
}
state.stats.entries = 0;
Ok(())
}
async fn stats(&self) -> CacheStats {
let mut state = self.state.lock().await;
if let Ok(data) = self.load().await {
state.stats.entries = data.entries.len();
}
state.stats.clone()
}
fn name(&self) -> &'static str {
"DiskCache"
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_chains::NamedChain;
use chrono::NaiveDate;
use tempfile::TempDir;
fn create_test_window(start_block: u64, end_block: u64) -> DailyBlockWindow {
DailyBlockWindow {
start_block,
end_block,
start_ts: crate::blocks::window::UnixTimestamp(1728518400),
end_ts_exclusive: crate::blocks::window::UnixTimestamp(1728604800),
}
}
fn create_test_key(day: u32) -> CacheKey {
CacheKey::new(
NamedChain::Arbitrum,
NaiveDate::from_ymd_opt(2025, 10, day).unwrap(),
)
}
#[tokio::test]
async fn test_disk_cache_basic_operations() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let cache = DiskCache::new(&cache_path).validate().unwrap();
let key = create_test_key(15);
let window = create_test_window(1000, 2000);
assert!(cache.get(&key).await.is_none());
assert!(cache.insert(key.clone(), window.clone()).await.is_ok());
let retrieved = cache.get(&key).await;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().start_block, 1000);
let stats = cache.stats().await;
assert_eq!(stats.hits, 1);
assert_eq!(stats.misses, 1);
}
#[tokio::test]
async fn test_disk_cache_persistence() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let key = create_test_key(15);
let window = create_test_window(1000, 2000);
{
let cache = DiskCache::new(&cache_path).validate().unwrap();
cache.insert(key.clone(), window).await.unwrap();
}
{
let cache = DiskCache::new(&cache_path).validate().unwrap();
let retrieved = cache.get(&key).await;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().start_block, 1000);
}
}
#[tokio::test]
async fn test_disk_cache_size_limit() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let cache = DiskCache::new(&cache_path)
.with_max_entries(3)
.validate()
.unwrap();
for day in 1..=4 {
let key = create_test_key(day);
let window = create_test_window(day as u64 * 1000, day as u64 * 2000);
cache.insert(key, window).await.unwrap();
}
let stats = cache.stats().await;
assert_eq!(stats.entries, 3);
assert_eq!(stats.evictions, 1);
assert!(cache.get(&create_test_key(1)).await.is_none());
assert!(cache.get(&create_test_key(2)).await.is_some());
assert!(cache.get(&create_test_key(3)).await.is_some());
assert!(cache.get(&create_test_key(4)).await.is_some());
}
#[tokio::test]
async fn test_disk_cache_deterministic_eviction() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let cache = DiskCache::new(&cache_path)
.with_max_entries(2)
.validate()
.unwrap();
for day in 1..=3 {
let key = create_test_key(day);
let window = create_test_window(day as u64 * 1000, day as u64 * 2000);
cache.insert(key, window).await.unwrap();
}
assert!(cache.get(&create_test_key(1)).await.is_none());
assert!(cache.get(&create_test_key(2)).await.is_some());
assert!(cache.get(&create_test_key(3)).await.is_some());
}
#[tokio::test]
async fn test_disk_cache_ttl() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let cache = DiskCache::new(&cache_path)
.with_ttl(Duration::from_millis(50))
.validate()
.unwrap();
let key = create_test_key(15);
let window = create_test_window(1000, 2000);
cache.insert(key.clone(), window).await.unwrap();
assert!(cache.get(&key).await.is_some());
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(cache.get(&key).await.is_none());
let stats = cache.stats().await;
assert_eq!(stats.expirations, 1);
}
#[tokio::test]
async fn test_disk_cache_clear() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let cache = DiskCache::new(&cache_path).validate().unwrap();
for day in 1..=5 {
let key = create_test_key(day);
let window = create_test_window(day as u64 * 1000, day as u64 * 2000);
cache.insert(key, window).await.unwrap();
}
cache.clear().await.unwrap();
assert!(!cache_path.exists());
let stats = cache.stats().await;
assert_eq!(stats.entries, 0);
}
#[tokio::test]
async fn test_disk_cache_validation() {
let temp_dir = TempDir::new().unwrap();
let cache_path = temp_dir.path().join("subdir").join("cache.json");
let cache = DiskCache::new(&cache_path).validate();
assert!(cache.is_ok());
assert!(cache_path.parent().unwrap().exists());
}
}