use std::{sync::Arc, time::Duration};
use moka::future::Cache;
use redis::{aio::ConnectionManager, AsyncCommands, Client};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use crate::{
common::{retry_with_backoff, RetryConfig},
config::ipfs::IpfsConfig,
};
#[derive(Debug, thiserror::Error)]
pub enum IpfsCacheError {
#[error("Redis connection error: {0}")]
RedisConnection(String),
#[error("Redis operation error: {0}")]
RedisOperation(String),
#[error("Network fetch error: {0}")]
NetworkFetch(String),
#[error("Fetched IPFS content exceeds maximum allowed size: {0} bytes")]
ResponseTooLarge(usize),
#[error("Cache miss")]
CacheMiss,
}
pub struct IpfsCacheService {
config: IpfsConfig,
memory_cache: Option<Cache<String, Arc<Vec<u8>>>>,
redis_connection: RwLock<Option<ConnectionManager>>,
redis_client: Option<Client>,
http_client: reqwest::Client,
}
impl std::fmt::Debug for IpfsCacheService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IpfsCacheService")
.field("config", &self.config)
.field("memory_cache_enabled", &self.memory_cache.is_some())
.field("redis_enabled", &self.redis_client.is_some())
.finish()
}
}
impl IpfsCacheService {
pub fn new(config: IpfsConfig, redis_url: Option<&str>) -> Self {
let memory_cache = if config.cache_enabled {
let cache = Cache::builder()
.max_capacity(config.memory_cache_size as u64)
.time_to_live(Duration::from_secs(config.memory_cache_ttl_secs))
.build();
info!(
"IPFS L1 cache initialized: max_entries={}, ttl={}s",
config.memory_cache_size, config.memory_cache_ttl_secs
);
Some(cache)
} else {
info!("IPFS L1 cache disabled");
None
};
let redis_client = if config.cache_enabled && config.redis_cache_enabled {
match redis_url {
Some(url) => match Client::open(url) {
Ok(client) => {
info!("IPFS L2 Redis client created: ttl={}s", config.redis_cache_ttl_secs);
Some(client)
}
Err(e) => {
warn!(error = %e, "Failed to create Redis client for IPFS cache, L2 disabled");
None
}
},
None => {
info!("No Redis URL provided, IPFS L2 cache disabled");
None
}
}
} else {
None
};
Self {
config,
memory_cache,
redis_connection: RwLock::new(None),
redis_client,
http_client: reqwest::Client::new(),
}
}
pub async fn initialize(&self) -> Result<(), IpfsCacheError> {
if let Some(ref client) = self.redis_client {
info!("Initializing IPFS cache Redis connection...");
match tokio::time::timeout(Duration::from_secs(10), client.get_connection_manager()).await {
Ok(Ok(manager)) => {
*self.redis_connection.write().await = Some(manager);
info!("IPFS cache Redis connection established");
}
Ok(Err(e)) => {
warn!(error = %e, "Failed to connect to Redis for IPFS cache, continuing without L2");
}
Err(_) => {
warn!("Redis connection timeout for IPFS cache, continuing without L2");
}
}
}
Ok(())
}
pub async fn get(&self, cid: &str) -> eyre::Result<Vec<u8>> {
if let Some(ref cache) = self.memory_cache {
if let Some(data) = cache.get(cid).await {
debug!(cid = cid, "IPFS cache L1 hit");
return Ok((*data).clone());
}
}
if let Some(data) = self.get_from_redis(cid).await {
debug!(cid = cid, "IPFS cache L2 hit");
if let Some(ref cache) = self.memory_cache {
cache.insert(cid.to_string(), Arc::new(data.clone())).await;
}
return Ok(data);
}
debug!(cid = cid, "IPFS cache miss, fetching from network");
let data = self.fetch_from_network(cid).await?;
self.set(cid, data.clone()).await;
Ok(data)
}
pub async fn get_text(&self, cid: &str) -> eyre::Result<String> {
let data = self.get(cid).await?;
String::from_utf8(data).map_err(|e| eyre::eyre!("Invalid UTF-8 content: {}", e))
}
async fn set(&self, cid: &str, data: Vec<u8>) {
let data_arc = Arc::new(data);
if let Some(ref cache) = self.memory_cache {
cache.insert(cid.to_string(), data_arc.clone()).await;
}
self.set_in_redis(cid, &data_arc).await;
}
async fn get_from_redis(&self, cid: &str) -> Option<Vec<u8>> {
let guard = self.redis_connection.read().await;
let conn = guard.as_ref()?;
let key = Self::redis_key(cid);
let mut conn = conn.clone();
match conn.get::<_, Option<Vec<u8>>>(&key).await {
Ok(Some(data)) => Some(data),
Ok(None) => None,
Err(e) => {
warn!(error = %e, cid = cid, "Redis GET failed for IPFS content");
None
}
}
}
async fn set_in_redis(&self, cid: &str, data: &[u8]) {
let guard = self.redis_connection.read().await;
let Some(conn) = guard.as_ref() else { return };
let key = Self::redis_key(cid);
let ttl_secs = self.config.redis_cache_ttl_secs;
let mut conn = conn.clone();
if let Err(e) = conn.set_ex::<_, _, ()>(&key, data, ttl_secs).await {
warn!(error = %e, cid = cid, "Redis SET failed for IPFS content");
}
}
fn redis_key(cid: &str) -> String {
format!("ipfs:content:{}", cid)
}
async fn fetch_from_network(&self, cid: &str) -> eyre::Result<Vec<u8>> {
let url = self.build_ipfs_url(cid);
info!(url = %url, "Fetching from IPFS network");
let response = self.http_client.get(&url).send().await;
match response {
Ok(resp) if resp.status().is_success() => {
let bytes = read_bounded_response(resp, self.config.max_fetch_size_bytes)
.await
.map_err(|e| eyre::eyre!("Failed to read IPFS response: {}", e))?;
info!(cid = cid, bytes = bytes.len(), "IPFS fetch successful");
Ok(bytes)
}
Ok(resp) => {
warn!(
cid = cid,
status = %resp.status(),
"Primary IPFS gateway failed, trying fallback"
);
self.fetch_from_fallback(cid).await
}
Err(e) => {
warn!(error = %e, cid = cid, "Primary IPFS gateway error, trying fallback");
self.fetch_from_fallback(cid).await
}
}
}
async fn fetch_from_fallback(&self, cid: &str) -> eyre::Result<Vec<u8>> {
retry_with_backoff(
&fallback_retry_config(),
"ipfs_gateway_fallback_fetch",
|| async { self.fetch_from_fallback_once(cid).await },
|e| {
metrics::counter!("ipfs_cache_gateway_fallback_retry_attempts_total").increment(1);
is_retryable_fallback_error(e)
},
)
.await
}
async fn fetch_from_fallback_once(&self, cid: &str) -> eyre::Result<Vec<u8>> {
let url = format!("{}{}", self.config.fallback_gateway, cid);
info!(url = %url, "Fetching from fallback IPFS gateway");
let response = self
.http_client
.get(&url)
.send()
.await
.map_err(FallbackFetchError::Request)?;
if response.status().is_success() {
let bytes = read_bounded_response(response, self.config.max_fetch_size_bytes).await?;
info!(cid = cid, bytes = bytes.len(), "Fallback IPFS fetch successful");
Ok(bytes)
} else {
Err(FallbackFetchError::Status(response.status()).into())
}
}
fn build_ipfs_url(&self, cid: &str) -> String {
let base = &self.config.gateway;
let params = &self.config.params;
if params.is_empty() {
format!("{}{}", base, cid)
} else {
format!("{}{}?{}", base, cid, params)
}
}
pub async fn is_redis_available(&self) -> bool {
let guard = self.redis_connection.read().await;
if let Some(ref conn) = *guard {
let mut conn = conn.clone();
conn.get::<&str, Option<String>>("__ipfs_cache_test__").await.is_ok()
} else {
false
}
}
}
fn fallback_retry_config() -> RetryConfig {
RetryConfig {
max_retries: 5,
..Default::default()
}
}
#[derive(Debug, thiserror::Error)]
enum FallbackFetchError {
#[error("Fallback IPFS fetch failed: {0}")]
Request(reqwest::Error),
#[error("Failed to read fallback IPFS response: {0}")]
Body(reqwest::Error),
#[error("Fallback IPFS fetch failed with status: {0}")]
Status(reqwest::StatusCode),
}
fn is_retryable_fallback_error(error: &eyre::Report) -> bool {
match error.downcast_ref::<FallbackFetchError>() {
Some(FallbackFetchError::Request(err) | FallbackFetchError::Body(err)) => {
err.is_timeout() || err.is_connect() || err.status().is_none()
}
Some(FallbackFetchError::Status(status)) => status.is_server_error(),
None => false,
}
}
async fn read_bounded_response(mut response: reqwest::Response, max_bytes: usize) -> eyre::Result<Vec<u8>> {
let content_length = response.content_length().map(|v| v as usize);
if let Some(len) = content_length {
if len > max_bytes {
return Err(IpfsCacheError::ResponseTooLarge(len).into());
}
}
let mut bytes = Vec::with_capacity(content_length.unwrap_or(8192).min(max_bytes));
while let Some(chunk) = response
.chunk()
.await
.map_err(|e| eyre::eyre!("Failed to read response chunk: {}", e))?
{
if bytes.len() + chunk.len() > max_bytes {
return Err(IpfsCacheError::ResponseTooLarge(bytes.len() + chunk.len()).into());
}
bytes.extend_from_slice(&chunk);
}
Ok(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_redis_key_format() {
let key = IpfsCacheService::redis_key("QmTest123");
assert_eq!(key, "ipfs:content:QmTest123");
}
#[test]
fn test_build_ipfs_url_without_params() {
let config = IpfsConfig {
gateway: "https://ipfs.example.com/ipfs/".to_string(),
params: String::new(),
..Default::default()
};
let service = IpfsCacheService::new(config, None);
let url = service.build_ipfs_url("QmTest123");
assert_eq!(url, "https://ipfs.example.com/ipfs/QmTest123");
}
#[test]
fn test_build_ipfs_url_with_params() {
let config = IpfsConfig {
gateway: "https://ipfs.example.com/ipfs/".to_string(),
params: "format=dag-json".to_string(),
..Default::default()
};
let service = IpfsCacheService::new(config, None);
let url = service.build_ipfs_url("QmTest123");
assert_eq!(url, "https://ipfs.example.com/ipfs/QmTest123?format=dag-json");
}
#[tokio::test]
async fn test_memory_cache_disabled() {
let config = IpfsConfig {
cache_enabled: false,
..Default::default()
};
let service = IpfsCacheService::new(config, None);
assert!(service.memory_cache.is_none());
}
#[tokio::test]
async fn test_memory_cache_enabled() {
let config = IpfsConfig {
cache_enabled: true,
memory_cache_size: 50,
memory_cache_ttl_secs: 600,
..Default::default()
};
let service = IpfsCacheService::new(config, None);
assert!(service.memory_cache.is_some());
}
#[test]
fn fallback_fetch_uses_five_retries() {
assert_eq!(fallback_retry_config().max_retries, 5);
}
#[test]
fn fallback_retry_classifies_http_statuses() {
let not_found: eyre::Report = FallbackFetchError::Status(reqwest::StatusCode::NOT_FOUND).into();
assert!(!is_retryable_fallback_error(¬_found));
let service_unavailable: eyre::Report =
FallbackFetchError::Status(reqwest::StatusCode::SERVICE_UNAVAILABLE).into();
assert!(is_retryable_fallback_error(&service_unavailable));
}
}