use crate::mcp::client::error::{ClientError, Result};
use crate::mcp::client::resource::{ResourceContent, ResourceInfo};
use chrono::{DateTime, Utc};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite_migration::{M, Migrations};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Mutex, OnceLock};
use std::time::Duration;
use uuid::Uuid;
static INITIALIZED_DATABASES: OnceLock<Mutex<HashMap<String, ()>>> = OnceLock::new();
static MIGRATIONS: &[M] = &[
M::up(
r#"
CREATE TABLE IF NOT EXISTS resources (
id TEXT PRIMARY KEY,
uri TEXT UNIQUE NOT NULL,
content BLOB NOT NULL,
content_type TEXT,
metadata_json TEXT,
created_at INTEGER NOT NULL,
accessed_at INTEGER NOT NULL,
expires_at INTEGER,
access_count INTEGER DEFAULT 0,
size_bytes INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_resources_uri ON resources(uri);
CREATE INDEX IF NOT EXISTS idx_resources_expires ON resources(expires_at);
CREATE INDEX IF NOT EXISTS idx_resources_accessed ON resources(accessed_at);
CREATE TABLE IF NOT EXISTS cache_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
hit_rate REAL,
total_requests INTEGER,
cache_size_mb REAL,
eviction_count INTEGER
);
CREATE TRIGGER IF NOT EXISTS cleanup_expired_resources
AFTER INSERT ON resources
BEGIN
DELETE FROM resources
WHERE expires_at IS NOT NULL
AND expires_at < strftime('%s', 'now') * 1000;
END;
"#,
)
.down(
r#"
DROP TABLE IF EXISTS cache_analytics;
DROP TABLE IF EXISTS resources;
"#,
),
];
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub database_path: String,
pub default_ttl: Duration,
pub max_size_mb: u64,
pub auto_cleanup: bool,
pub cleanup_interval: Duration,
pub pool_min_connections: Option<u32>,
pub pool_max_connections: Option<u32>,
pub pool_connection_timeout: Option<Duration>,
pub pool_max_lifetime: Option<Duration>,
}
impl Default for CacheConfig {
fn default() -> Self {
let cache_path = if let Some(home) = dirs::home_dir() {
home.join(".agenterra").join("cache.db")
} else {
std::path::PathBuf::from("./agenterra_cache.db")
};
Self {
database_path: cache_path.to_string_lossy().to_string(),
default_ttl: Duration::from_secs(3600), max_size_mb: 100, auto_cleanup: true,
cleanup_interval: Duration::from_secs(300), pool_min_connections: Some(1), pool_max_connections: Some(10), pool_connection_timeout: Some(Duration::from_secs(30)),
pool_max_lifetime: Some(Duration::from_secs(300)), }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheAnalytics {
pub total_requests: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub hit_rate: f64,
pub cache_size_bytes: u64,
pub resource_count: u64,
pub eviction_count: u64,
pub last_cleanup: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedResource {
pub id: String,
pub uri: String,
pub content: Vec<u8>,
pub content_type: Option<String>,
pub metadata: HashMap<String, serde_json::Value>,
pub created_at: DateTime<Utc>,
pub accessed_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
pub access_count: u64,
pub size_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub max_connections: u32,
pub active_connections: u32,
pub idle_connections: u32,
}
pub struct ResourceCache {
config: CacheConfig,
analytics: CacheAnalytics,
pool: Pool<SqliteConnectionManager>,
}
impl ResourceCache {
pub async fn new(config: CacheConfig) -> Result<Self> {
let analytics = CacheAnalytics {
total_requests: 0,
cache_hits: 0,
cache_misses: 0,
hit_rate: 0.0,
cache_size_bytes: 0,
resource_count: 0,
eviction_count: 0,
last_cleanup: Utc::now(),
};
if config.database_path.is_empty() {
return Err(ClientError::Validation(
"database_path cannot be empty".to_string(),
));
}
if let (Some(min), Some(max)) = (config.pool_min_connections, config.pool_max_connections) {
if min > max {
return Err(ClientError::Validation(format!(
"pool_min_connections ({}) must be ≤ pool_max_connections ({})",
min, max
)));
}
}
let manager = SqliteConnectionManager::file(&config.database_path);
let mut pool_builder = Pool::builder();
if let Some(min_size) = config.pool_min_connections {
pool_builder = pool_builder.min_idle(Some(min_size));
}
if let Some(max_size) = config.pool_max_connections {
pool_builder = pool_builder.max_size(max_size);
}
if let Some(timeout) = config.pool_connection_timeout {
pool_builder = pool_builder.connection_timeout(timeout);
}
if let Some(max_lifetime) = config.pool_max_lifetime {
pool_builder = pool_builder.max_lifetime(Some(max_lifetime));
}
let pool = pool_builder
.build(manager)
.map_err(|e| ClientError::Pool(format!("Failed to create connection pool: {}", e)))?;
let cache = Self {
config,
analytics,
pool,
};
cache.init_schema().await?;
Ok(cache)
}
async fn with_connection<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&mut rusqlite::Connection) -> rusqlite::Result<R> + Send + 'static,
R: Send + 'static,
{
let pool = self.pool.clone();
tokio::task::spawn_blocking(move || {
let mut conn = pool.get().map_err(|e| {
ClientError::Pool(format!("Failed to get pooled connection: {}", e))
})?;
f(&mut conn)
.map_err(|e| ClientError::Client(format!("Database operation failed: {}", e)))
})
.await
.map_err(|e| ClientError::Spawn(format!("Task execution failed: {}", e)))?
}
async fn init_schema(&self) -> Result<()> {
let db_path = normalize_db_path(&self.config.database_path);
{
let tracker = get_db_tracker().lock().unwrap();
if tracker.contains_key(&db_path) {
tracing::debug!("Database schema already initialized for: {}", db_path);
return Ok(());
}
}
self.with_connection(move |conn| {
tracing::debug!(
"Entering critical section for database schema initialization: {}",
db_path
);
{
let tracker = get_db_tracker().lock().unwrap();
if tracker.contains_key(&db_path) {
tracing::debug!(
"Database schema was initialized by another thread: {}",
db_path
);
return Ok(());
}
}
if let Some(parent) = std::path::Path::new(&db_path).parent() {
std::fs::create_dir_all(parent).map_err(|e| {
rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CANTOPEN),
Some(format!("Failed to create directory: {}", e)),
)
})?;
}
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "cache_size", 10000)?;
conn.pragma_update(None, "temp_store", "memory")?;
conn.busy_timeout(std::time::Duration::from_secs(5))?;
let migrations = Migrations::new(MIGRATIONS.to_vec());
match migrations.to_latest(conn) {
Ok(()) => {
let mut tracker = get_db_tracker().lock().unwrap();
tracker.insert(db_path.clone(), ());
tracing::debug!(
"Database migrations completed successfully for: {}",
db_path
);
Ok(())
}
Err(e) => {
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("already exists") || error_msg.contains("duplicate") {
let mut tracker = get_db_tracker().lock().unwrap();
tracker.insert(db_path.clone(), ());
tracing::debug!("Schema already exists (concurrent creation), continuing");
Ok(())
} else {
tracing::error!("Database migration failed: {}", e);
Err(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_ERROR),
Some(format!("Migration failed: {}", e)),
))
}
}
}
})
.await
}
pub async fn store_resource(&mut self, resource: &ResourceContent) -> Result<String> {
self.store_resource_with_ttl(resource, self.config.default_ttl)
.await
}
pub async fn store_resource_with_ttl(
&mut self,
resource: &ResourceContent,
ttl: Duration,
) -> Result<String> {
let id = Uuid::new_v4().to_string();
let now = Utc::now();
let expires_at = if ttl.is_zero() {
None
} else {
Some(
now + chrono::Duration::from_std(ttl)
.map_err(|_| ClientError::Validation("Invalid TTL duration".to_string()))?,
)
};
let mut metadata = resource.info.metadata.clone();
if let Some(ref encoding) = resource.encoding {
metadata.insert("encoding".to_string(), serde_json::json!(encoding));
}
let metadata_json = serde_json::to_string(&metadata)?;
let size_bytes = resource.data.len() as u64;
let id_clone = id.clone();
let uri = resource.info.uri.clone();
let content = resource.data.clone();
let content_type = resource.info.mime_type.clone();
let created_at = now.timestamp_millis();
let accessed_at = now.timestamp_millis();
let expires_at_millis = expires_at.map(|t| t.timestamp_millis());
self.with_connection(move |conn| {
let tx = conn.transaction()?;
tx.execute(
"INSERT OR REPLACE INTO resources (
id, uri, content, content_type, metadata_json,
created_at, accessed_at, expires_at, access_count, size_bytes
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
id_clone,
uri,
content,
content_type,
metadata_json,
created_at,
accessed_at,
expires_at_millis,
1, size_bytes as i64,
],
)?;
tx.commit()?;
Ok(())
})
.await?;
self.analytics.resource_count += 1;
self.analytics.cache_size_bytes += size_bytes;
Ok(id)
}
pub async fn get_resource(&mut self, uri: &str) -> Result<Option<ResourceContent>> {
let uri = uri.to_string();
let now = Utc::now().timestamp_millis();
let result = self
.with_connection(move |conn| {
let mut stmt = conn.prepare(
"SELECT id, uri, content, content_type, metadata_json,
created_at, accessed_at, expires_at, access_count, size_bytes
FROM resources
WHERE uri = ?1
AND (expires_at IS NULL OR expires_at > ?2)"
)?;
let row = match stmt.query_row(rusqlite::params![uri, now], |row| {
Ok((
row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, Vec<u8>>(2)?, row.get::<_, Option<String>>(3)?, row.get::<_, String>(4)?, row.get::<_, i64>(5)?, row.get::<_, i64>(6)?, row.get::<_, Option<i64>>(7)?, row.get::<_, i64>(8)?, row.get::<_, i64>(9)?, ))
}) {
Ok(row) => row,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(e),
};
conn.execute(
"UPDATE resources SET accessed_at = ?1, access_count = access_count + 1 WHERE uri = ?2",
rusqlite::params![now, uri],
)?;
Ok(Some(row))
})
.await?;
match result {
Some((_, uri, content, content_type, metadata_json, _, _, _, _, _)) => {
let metadata: HashMap<String, serde_json::Value> =
serde_json::from_str(&metadata_json).map_err(|e| {
ClientError::Client(format!("Failed to parse metadata: {}", e))
})?;
let info = ResourceInfo {
uri: uri.clone(),
name: metadata
.get("name")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
description: metadata
.get("description")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
mime_type: content_type.clone(),
metadata,
};
let encoding = info
.metadata
.get("encoding")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.or_else(|| content_type.as_ref().and_then(|ct| parse_charset(ct)));
self.analytics.total_requests += 1;
self.analytics.cache_hits += 1;
self.analytics.hit_rate = if self.analytics.total_requests > 0 {
self.analytics.cache_hits as f64 / self.analytics.total_requests as f64
} else {
0.0
};
Ok(Some(ResourceContent {
info,
data: content,
encoding,
}))
}
None => {
self.analytics.total_requests += 1;
self.analytics.cache_misses += 1;
self.analytics.hit_rate = if self.analytics.total_requests > 0 {
self.analytics.cache_hits as f64 / self.analytics.total_requests as f64
} else {
0.0
};
Ok(None)
}
}
}
pub async fn list_cached_resources(&self) -> Result<Vec<CachedResource>> {
let now = Utc::now().timestamp_millis();
self.with_connection(move |conn| {
let mut stmt = conn.prepare(
"SELECT id, uri, content, content_type, metadata_json,
created_at, accessed_at, expires_at, access_count, size_bytes
FROM resources
WHERE expires_at IS NULL OR expires_at > ?1
ORDER BY accessed_at DESC",
)?;
let rows = stmt.query_map(rusqlite::params![now], |row| {
let metadata_json: String = row.get(4)?;
let metadata: HashMap<String, serde_json::Value> =
match serde_json::from_str(&metadata_json) {
Ok(m) => m,
Err(e) => {
tracing::warn!("Failed to parse metadata JSON: {}", e);
HashMap::new()
}
};
Ok(CachedResource {
id: row.get(0)?,
uri: row.get(1)?,
content: row.get(2)?,
content_type: row.get(3)?,
metadata,
created_at: DateTime::from_timestamp_millis(row.get::<_, i64>(5)?)
.unwrap_or_default(),
accessed_at: DateTime::from_timestamp_millis(row.get::<_, i64>(6)?)
.unwrap_or_default(),
expires_at: row
.get::<_, Option<i64>>(7)?
.map(|ts| DateTime::from_timestamp_millis(ts).unwrap_or_default()),
access_count: row.get::<_, i64>(8)? as u64,
size_bytes: row.get::<_, i64>(9)? as u64,
})
})?;
let mut resources = Vec::new();
for row in rows {
resources.push(row?);
}
Ok(resources)
})
.await
}
pub async fn contains_resource(&self, uri: &str) -> Result<bool> {
let uri = uri.to_string();
let now = Utc::now().timestamp_millis();
self.with_connection(move |conn| {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM resources WHERE uri = ?1 AND (expires_at IS NULL OR expires_at > ?2)",
rusqlite::params![uri, now],
|row| row.get(0),
)?;
Ok(count > 0)
}).await
}
pub async fn remove_resource(&mut self, uri: &str) -> Result<bool> {
let uri = uri.to_string();
let removed = self
.with_connection(move |conn| {
let changes = conn.execute(
"DELETE FROM resources WHERE uri = ?1",
rusqlite::params![uri],
)?;
Ok(changes > 0)
})
.await?;
if removed {
self.analytics.resource_count = self.analytics.resource_count.saturating_sub(1);
}
Ok(removed)
}
pub async fn clear(&mut self) -> Result<()> {
self.with_connection(|conn| {
conn.execute("DELETE FROM resources", [])?;
conn.execute("DELETE FROM cache_analytics", [])?;
Ok(())
})
.await?;
self.analytics = CacheAnalytics {
total_requests: 0,
cache_hits: 0,
cache_misses: 0,
hit_rate: 0.0,
cache_size_bytes: 0,
resource_count: 0,
eviction_count: 0,
last_cleanup: Utc::now(),
};
Ok(())
}
pub async fn cleanup_expired(&mut self) -> Result<u64> {
let now = Utc::now().timestamp_millis();
let removed_count = self
.with_connection(move |conn| {
let changes = conn.execute(
"DELETE FROM resources WHERE expires_at IS NOT NULL AND expires_at <= ?1",
rusqlite::params![now],
)?;
Ok(changes as u64)
})
.await?;
self.analytics.eviction_count += removed_count;
self.analytics.last_cleanup = Utc::now();
self.analytics.resource_count = self.analytics.resource_count.saturating_sub(removed_count);
self.update_analytics().await?;
Ok(removed_count)
}
pub fn get_analytics(&self) -> &CacheAnalytics {
&self.analytics
}
async fn update_analytics(&mut self) -> Result<()> {
let (total_size, resource_count) = self
.with_connection(|conn| {
let size: i64 = conn
.query_row(
"SELECT COALESCE(SUM(size_bytes), 0) FROM resources",
[],
|row| row.get(0),
)
.unwrap_or(0);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM resources", [], |row| row.get(0))
.unwrap_or(0);
Ok((size as u64, count as u64))
})
.await?;
self.analytics.cache_size_bytes = total_size;
self.analytics.resource_count = resource_count;
let timestamp = Utc::now().timestamp_millis();
let hit_rate = self.analytics.hit_rate;
let total_requests = self.analytics.total_requests as i64;
let cache_size_mb = (self.analytics.cache_size_bytes as f64) / (1024.0 * 1024.0);
let eviction_count = self.analytics.eviction_count as i64;
self.with_connection(move |conn| {
conn.execute(
"INSERT INTO cache_analytics (timestamp, hit_rate, total_requests, cache_size_mb, eviction_count)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
timestamp,
hit_rate,
total_requests,
cache_size_mb,
eviction_count,
],
)?;
Ok(())
}).await?;
Ok(())
}
pub async fn search_resources(&self, query: &str) -> Result<Vec<CachedResource>> {
let query = query.to_string();
let now = Utc::now().timestamp_millis();
self.with_connection(move |conn| {
let mut stmt = conn.prepare(
"SELECT id, uri, content, content_type, metadata_json,
created_at, accessed_at, expires_at, access_count, size_bytes
FROM resources
WHERE (expires_at IS NULL OR expires_at > ?2)
AND (uri LIKE ?1 OR content_type LIKE ?1 OR metadata_json LIKE ?1)
ORDER BY accessed_at DESC",
)?;
let search_pattern = format!("%{}%", query);
let rows = stmt.query_map(rusqlite::params![search_pattern, now], |row| {
let metadata_json: String = row.get(4)?;
let metadata: HashMap<String, serde_json::Value> =
match serde_json::from_str(&metadata_json) {
Ok(m) => m,
Err(e) => {
tracing::warn!("Failed to parse metadata JSON in search: {}", e);
HashMap::new()
}
};
Ok(CachedResource {
id: row.get(0)?,
uri: row.get(1)?,
content: row.get(2)?,
content_type: row.get(3)?,
metadata,
created_at: DateTime::from_timestamp_millis(row.get::<_, i64>(5)?)
.unwrap_or_default(),
accessed_at: DateTime::from_timestamp_millis(row.get::<_, i64>(6)?)
.unwrap_or_default(),
expires_at: row
.get::<_, Option<i64>>(7)?
.map(|ts| DateTime::from_timestamp_millis(ts).unwrap_or_default()),
access_count: row.get::<_, i64>(8)? as u64,
size_bytes: row.get::<_, i64>(9)? as u64,
})
})?;
let mut resources = Vec::new();
for row in rows {
resources.push(row?);
}
Ok(resources)
})
.await
}
pub async fn get_cache_size(&self) -> Result<u64> {
self.with_connection(|conn| {
let size: i64 = conn.query_row(
"SELECT COALESCE(SUM(size_bytes), 0) FROM resources",
[],
|row| row.get(0),
)?;
Ok(size as u64)
})
.await
}
pub async fn compact(&mut self) -> Result<()> {
self.with_connection(|conn| {
conn.execute("VACUUM", [])?;
Ok(())
})
.await
}
pub fn get_pool_stats(&self) -> PoolStats {
let state = self.pool.state();
PoolStats {
max_connections: self.pool.max_size(),
active_connections: state.connections - state.idle_connections,
idle_connections: state.idle_connections,
}
}
}
fn get_db_tracker() -> &'static Mutex<HashMap<String, ()>> {
INITIALIZED_DATABASES.get_or_init(|| Mutex::new(HashMap::new()))
}
fn parse_charset(content_type: &str) -> Option<String> {
content_type.split(';').find_map(|part| {
let (key, value) = part.trim().split_once('=')?;
if key.trim().eq_ignore_ascii_case("charset") {
Some(
value
.trim_matches(|c| c == '"' || c == '\'')
.to_ascii_lowercase(),
)
} else {
None
}
})
}
fn normalize_db_path(db_path: &str) -> String {
let path = Path::new(db_path);
if let Ok(canonical) = path.canonicalize() {
return canonical.to_string_lossy().to_string();
}
if path.is_relative() {
if let Ok(current_dir) = std::env::current_dir() {
let absolute_path = current_dir.join(path);
return normalize_path_components(&absolute_path);
}
}
if path.is_absolute() {
return normalize_path_components(path);
}
db_path.to_string()
}
fn normalize_path_components(path: &Path) -> String {
let mut components = Vec::new();
for component in path.components() {
match component {
std::path::Component::CurDir => {
continue;
}
std::path::Component::ParentDir => {
if !components.is_empty() {
components.pop();
}
}
_ => {
components.push(component);
}
}
}
let mut result = std::path::PathBuf::new();
for component in components {
result.push(component);
}
result.to_string_lossy().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::client::resource::ResourceInfo;
use std::collections::HashMap;
use tempfile::NamedTempFile;
const POOL_TIMEOUT: Duration = Duration::from_secs(30);
#[test]
fn test_normalize_db_path_existing_file() {
let temp_file = NamedTempFile::new().unwrap();
let temp_path = temp_file.path().to_string_lossy().to_string();
let normalized = normalize_db_path(&temp_path);
assert!(!normalized.is_empty());
assert!(Path::new(&normalized).is_absolute());
}
#[test]
fn test_normalize_db_path_relative_nonexistent() {
let relative_path = "./test_db.sqlite";
let normalized = normalize_db_path(relative_path);
assert!(Path::new(&normalized).is_absolute());
assert!(normalized.ends_with("test_db.sqlite"));
assert_ne!(normalized, relative_path);
}
#[test]
fn test_normalize_db_path_absolute_nonexistent() {
let current_dir = std::env::current_dir().unwrap();
let absolute_path = current_dir.join("nonexistent_db.sqlite");
let path_str = absolute_path.to_string_lossy().to_string();
let normalized = normalize_db_path(&path_str);
assert_eq!(normalized, path_str);
assert!(Path::new(&normalized).is_absolute());
}
#[test]
fn test_normalize_db_path_dot_prefix() {
let dot_path = "./db.sqlite";
let plain_path = "db.sqlite";
let normalized_dot = normalize_db_path(dot_path);
let normalized_plain = normalize_db_path(plain_path);
assert_eq!(normalized_dot, normalized_plain);
assert!(Path::new(&normalized_dot).is_absolute());
assert!(normalized_dot.ends_with("db.sqlite"));
let current_dir = std::env::current_dir().unwrap();
let expected = current_dir.join("db.sqlite").to_string_lossy().to_string();
assert_eq!(normalized_dot, expected);
assert_eq!(normalized_plain, expected);
}
#[test]
fn test_normalize_db_path_consistency() {
let test_path = "./test.db";
let normalized1 = normalize_db_path(test_path);
let normalized2 = normalize_db_path(test_path);
assert_eq!(normalized1, normalized2);
}
#[test]
fn test_normalize_db_path_edge_cases() {
let current_dir = std::env::current_dir().unwrap();
let expected_current = current_dir.to_string_lossy().to_string();
let normalized_empty = normalize_db_path("");
assert_eq!(normalized_empty, expected_current);
let normalized_dot = normalize_db_path(".");
assert!(Path::new(&normalized_dot).is_absolute());
assert_eq!(normalized_dot, expected_current);
let normalized_double_dot = normalize_db_path("..");
assert!(Path::new(&normalized_double_dot).is_absolute());
let expected_parent = current_dir
.parent()
.unwrap_or(¤t_dir)
.to_string_lossy()
.to_string();
assert_eq!(normalized_double_dot, expected_parent);
}
fn create_test_cache_config() -> (CacheConfig, tempfile::TempDir) {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join(format!("test_{}.db", Uuid::new_v4()));
let config = CacheConfig {
database_path: db_path.to_string_lossy().to_string(),
..Default::default()
};
(config, temp_dir)
}
fn create_test_resource() -> ResourceContent {
let mut metadata = HashMap::new();
metadata.insert(
"size".to_string(),
serde_json::Value::Number(serde_json::Number::from(13)),
);
let info = ResourceInfo {
uri: "test://example.txt".to_string(),
name: Some("example.txt".to_string()),
description: Some("Test resource".to_string()),
mime_type: Some("text/plain".to_string()),
metadata,
};
ResourceContent {
info,
data: b"Hello, World!".to_vec(),
encoding: Some("utf-8".to_string()),
}
}
#[tokio::test]
async fn test_cache_creation_with_temp_file() {
let (config, _temp_dir) = create_test_cache_config();
let result = ResourceCache::new(config).await;
assert!(result.is_ok());
let cache = result.unwrap();
assert_eq!(cache.get_analytics().resource_count, 0);
assert_eq!(cache.get_analytics().cache_size_bytes, 0);
}
#[tokio::test]
async fn test_cache_persistence_across_sessions() {
let (config, _temp_dir) = create_test_cache_config();
let db_path = config.database_path.clone();
{
let mut cache = ResourceCache::new(config.clone()).await.unwrap();
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
}
{
let config = CacheConfig {
database_path: db_path,
..Default::default()
};
let mut cache = ResourceCache::new(config).await.unwrap();
let retrieved = cache.get_resource("test://example.txt").await.unwrap();
assert!(
retrieved.is_some(),
"Resource should persist across sessions"
);
assert_eq!(retrieved.unwrap().data, b"Hello, World!");
}
}
#[tokio::test]
async fn test_cache_creation_file_based() {
let temp_file = NamedTempFile::new().unwrap();
let config = CacheConfig {
database_path: temp_file.path().to_string_lossy().to_string(),
..Default::default()
};
let result = ResourceCache::new(config).await;
assert!(result.is_ok());
let cache = result.unwrap();
assert_eq!(cache.get_analytics().resource_count, 0);
}
#[tokio::test]
async fn test_store_and_retrieve_resource() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let result = cache.store_resource(&resource).await;
assert!(result.is_ok());
let resource_id = result.unwrap();
assert!(!resource_id.is_empty());
let result = cache.get_resource("test://example.txt").await;
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(retrieved_resource.info.uri, "test://example.txt");
assert_eq!(retrieved_resource.data, b"Hello, World!");
}
#[tokio::test]
async fn test_store_resource_with_custom_ttl() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let ttl = Duration::from_secs(60);
let result = cache.store_resource_with_ttl(&resource, ttl).await;
assert!(result.is_ok());
let resource_id = result.unwrap();
assert!(!resource_id.is_empty());
let retrieved = cache.get_resource("test://example.txt").await.unwrap();
assert!(retrieved.is_some());
}
#[tokio::test]
async fn test_list_cached_resources() {
let (config, _temp_dir) = create_test_cache_config();
let config = CacheConfig {
pool_connection_timeout: Some(POOL_TIMEOUT),
..config
};
let mut cache = ResourceCache::new(config).await.unwrap();
let result = cache.list_cached_resources().await;
if let Err(ref e) = result {
tracing::error!("Initial list_cached_resources failed: {:?}", e);
}
assert!(result.is_ok(), "Initial list should succeed");
let resources = result.unwrap();
assert_eq!(resources.len(), 0);
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let result = cache.list_cached_resources().await;
if let Err(ref e) = result {
tracing::error!("Second list_cached_resources failed: {:?}", e);
}
assert!(
result.is_ok(),
"Second list should succeed after storing resource"
);
let resources = result.unwrap();
assert_eq!(resources.len(), 1);
assert_eq!(resources[0].uri, "test://example.txt");
}
#[tokio::test]
async fn test_contains_resource() {
let temp_file = NamedTempFile::new().unwrap();
let config = CacheConfig {
database_path: temp_file.path().to_string_lossy().to_string(),
pool_connection_timeout: Some(POOL_TIMEOUT),
..Default::default()
};
let mut cache = ResourceCache::new(config).await.unwrap();
let result = cache.contains_resource("test://example.txt").await;
assert!(result.is_ok(), "Initial contains_resource should succeed");
assert!(!result.unwrap());
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let result = cache.contains_resource("test://example.txt").await;
assert!(result.is_ok(), "Second contains_resource should succeed");
assert!(result.unwrap());
}
#[tokio::test]
async fn test_remove_resource() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
assert!(cache.contains_resource("test://example.txt").await.unwrap());
let result = cache.remove_resource("test://example.txt").await;
assert!(result.is_ok());
assert!(result.unwrap());
assert!(!cache.contains_resource("test://example.txt").await.unwrap());
}
#[tokio::test]
async fn test_clear_cache() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let resources = cache.list_cached_resources().await.unwrap();
assert!(!resources.is_empty());
let result = cache.clear().await;
assert!(result.is_ok());
let resources = cache.list_cached_resources().await.unwrap();
assert!(resources.is_empty());
assert_eq!(cache.get_analytics().resource_count, 0);
}
#[tokio::test]
async fn test_cleanup_expired_resources() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
cache
.store_resource_with_ttl(&resource, Duration::from_millis(1))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let result = cache.cleanup_expired().await;
assert!(result.is_ok());
let removed_count = result.unwrap();
assert_eq!(removed_count, 1);
assert!(!cache.contains_resource("test://example.txt").await.unwrap());
}
#[tokio::test]
async fn test_cache_analytics() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let analytics = cache.get_analytics();
assert_eq!(analytics.resource_count, 0);
assert_eq!(analytics.cache_size_bytes, 0);
assert_eq!(analytics.total_requests, 0);
assert_eq!(analytics.cache_hits, 0);
assert_eq!(analytics.cache_misses, 0);
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let _retrieved = cache.get_resource("test://example.txt").await.unwrap();
let analytics = cache.get_analytics();
assert_eq!(analytics.resource_count, 1);
assert!(analytics.cache_size_bytes > 0);
assert_eq!(analytics.total_requests, 1);
assert_eq!(analytics.cache_hits, 1);
assert_eq!(analytics.cache_misses, 0);
assert_eq!(analytics.hit_rate, 1.0);
}
#[tokio::test]
async fn test_search_resources() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let result = cache.search_resources("text/plain").await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let result = cache.search_resources("text/plain").await;
assert!(result.is_ok());
let resources = result.unwrap();
assert_eq!(resources.len(), 1);
assert_eq!(resources[0].uri, "test://example.txt");
let result = cache.search_resources("example").await;
assert!(result.is_ok());
let resources = result.unwrap();
assert_eq!(resources.len(), 1);
}
#[tokio::test]
async fn test_get_cache_size() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let result = cache.get_cache_size().await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
let result = cache.get_cache_size().await;
assert!(result.is_ok());
let size = result.unwrap();
assert!(size > 0);
assert_eq!(size, 13); }
#[tokio::test]
async fn test_database_compaction() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
cache.store_resource(&resource).await.unwrap();
cache.remove_resource("test://example.txt").await.unwrap();
let result = cache.compact().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_ttl_expiration() {
let (mut config, _temp_dir) = create_test_cache_config();
config.default_ttl = Duration::from_millis(100); let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let _id = cache.store_resource(&resource).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let result = cache.get_resource("test://example.txt").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_concurrent_access() {
let (config, temp_dir) = create_test_cache_config();
let cache = std::sync::Arc::new(tokio::sync::Mutex::new(
ResourceCache::new(config).await.unwrap(),
));
let _temp_dir = std::sync::Arc::new(temp_dir);
let resource = create_test_resource();
let tasks = (0..10).map(|i| {
let cache = cache.clone();
let mut resource = resource.clone();
resource.info.uri = format!("test://example{}.txt", i);
tokio::spawn(async move {
let mut cache = cache.lock().await;
cache.store_resource(&resource).await
})
});
let results = futures::future::join_all(tasks).await;
for result in results {
assert!(result.is_ok());
let store_result = result.unwrap();
assert!(store_result.is_ok());
}
let cache = cache.lock().await;
let resources = cache.list_cached_resources().await.unwrap();
assert_eq!(resources.len(), 10);
}
#[tokio::test]
async fn test_acid_transactions() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let result = cache.store_resource(&resource).await;
match result {
Ok(_) => {
let retrieved = cache.get_resource("test://example.txt").await.unwrap();
assert!(retrieved.is_some());
}
Err(_) => {
let retrieved = cache.get_resource("test://example.txt").await.unwrap();
assert!(retrieved.is_none());
}
}
}
#[test]
fn test_cache_config_defaults() {
let config = CacheConfig::default();
assert!(config.database_path.ends_with("cache.db"));
assert!(!config.database_path.contains(":memory:"));
assert_eq!(config.default_ttl, Duration::from_secs(3600));
assert_eq!(config.max_size_mb, 100);
assert!(config.auto_cleanup);
assert_eq!(config.cleanup_interval, Duration::from_secs(300));
}
#[tokio::test]
async fn test_empty_database_path_validation() {
let config = CacheConfig {
database_path: String::new(),
..Default::default()
};
let result = ResourceCache::new(config).await;
assert!(result.is_err());
if let Err(ClientError::Validation(msg)) = result {
assert!(msg.contains("database_path cannot be empty"));
} else {
panic!("Expected Validation error for empty database path");
}
}
#[tokio::test]
async fn test_invalid_pool_configuration() {
let (mut config, _temp_dir) = create_test_cache_config();
config.pool_min_connections = Some(10);
config.pool_max_connections = Some(5);
let result = ResourceCache::new(config).await;
assert!(result.is_err());
if let Err(ClientError::Validation(msg)) = result {
assert!(msg.contains("pool_min_connections"));
assert!(msg.contains("pool_max_connections"));
} else {
panic!("Expected Validation error for invalid pool configuration");
}
}
#[test]
fn test_cached_resource_structure() {
let cached_resource = CachedResource {
id: Uuid::new_v4().to_string(),
uri: "test://example.txt".to_string(),
content: b"Hello, World!".to_vec(),
content_type: Some("text/plain".to_string()),
metadata: HashMap::new(),
created_at: Utc::now(),
accessed_at: Utc::now(),
expires_at: Some(Utc::now() + chrono::Duration::hours(1)),
access_count: 1,
size_bytes: 13,
};
assert_eq!(cached_resource.uri, "test://example.txt");
assert_eq!(cached_resource.content, b"Hello, World!");
assert_eq!(cached_resource.size_bytes, 13);
assert!(cached_resource.expires_at.is_some());
}
#[tokio::test]
async fn test_concurrent_cache_creation_shared_database() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path().to_string_lossy().to_string();
let mut caches = Vec::new();
for _ in 0..5 {
let config = CacheConfig {
database_path: db_path.clone(),
pool_connection_timeout: Some(POOL_TIMEOUT),
..Default::default()
};
let cache = ResourceCache::new(config).await.unwrap();
caches.push(cache);
}
for (i, cache) in caches.iter_mut().enumerate() {
let resource = create_test_resource();
let mut test_resource = resource.clone();
test_resource.info.uri = format!("test://shared-{}.txt", i);
cache.store_resource(&test_resource).await.unwrap();
assert!(
cache
.contains_resource(&test_resource.info.uri)
.await
.unwrap()
);
}
let first_cache = &caches[0];
for i in 0..5 {
let uri = format!("test://shared-{}.txt", i);
assert!(
first_cache.contains_resource(&uri).await.unwrap(),
"Resource {} should be accessible from any cache instance",
i
);
}
}
#[tokio::test]
async fn test_connection_pool_configuration() {
let (mut config, _temp_dir) = create_test_cache_config();
config.pool_min_connections = Some(2);
config.pool_max_connections = Some(10);
config.pool_connection_timeout = Some(POOL_TIMEOUT);
let result = ResourceCache::new(config).await;
assert!(result.is_ok());
let cache = result.unwrap();
let stats = cache.get_pool_stats();
assert_eq!(stats.max_connections, 10);
assert!(stats.active_connections <= 10);
}
#[tokio::test]
async fn test_concurrent_cache_operations_with_pool() {
let (mut config, temp_dir) = create_test_cache_config();
config.pool_min_connections = Some(3);
config.pool_max_connections = Some(5);
let _temp_dir = std::sync::Arc::new(temp_dir);
let cache = std::sync::Arc::new(tokio::sync::Mutex::new(
ResourceCache::new(config).await.unwrap(),
));
let mut tasks = Vec::new();
for i in 0..10 {
let cache = cache.clone();
let task = tokio::spawn(async move {
let mut resource = create_test_resource();
resource.info.uri = format!("test://concurrent{}.txt", i);
let mut cache_guard = cache.lock().await;
let start = std::time::Instant::now();
let result = cache_guard.store_resource(&resource).await;
let duration = start.elapsed();
assert!(result.is_ok());
duration
});
tasks.push(task);
}
let durations: Vec<std::time::Duration> = futures::future::join_all(tasks)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(durations.len(), 10);
let avg_duration = durations.iter().sum::<std::time::Duration>() / durations.len() as u32;
assert!(avg_duration < Duration::from_millis(100)); }
#[tokio::test]
async fn test_pool_exhaustion_handling() {
let (mut config, _temp_dir) = create_test_cache_config();
config.pool_min_connections = Some(1);
config.pool_max_connections = Some(2); config.pool_connection_timeout = Some(Duration::from_millis(100));
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let result = cache.store_resource(&resource).await;
assert!(result.is_ok());
let pool_stats = cache.get_pool_stats();
assert!(pool_stats.max_connections == 2);
}
#[tokio::test]
async fn test_connection_reuse_in_pool() {
let (mut config, _temp_dir) = create_test_cache_config();
config.pool_min_connections = Some(2);
config.pool_max_connections = Some(3);
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = create_test_resource();
let _result1 = cache.store_resource(&resource).await.unwrap();
let stats1 = cache.get_pool_stats();
let _result2 = cache.get_resource("test://example.txt").await.unwrap();
let stats2 = cache.get_pool_stats();
assert!(stats2.active_connections <= stats1.active_connections + 1);
}
#[tokio::test]
async fn test_pool_connection_lifecycle() {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let config = CacheConfig {
database_path: temp_file.path().to_string_lossy().to_string(),
pool_min_connections: Some(1),
pool_max_connections: Some(3),
..Default::default()
};
{
let cache = ResourceCache::new(config).await.unwrap();
let pool_stats = cache.get_pool_stats();
assert_eq!(pool_stats.max_connections, 3);
assert!(pool_stats.active_connections <= pool_stats.max_connections);
}
}
#[test]
fn test_parse_charset() {
assert_eq!(
parse_charset("text/html; charset=utf-8"),
Some("utf-8".to_string())
);
assert_eq!(
parse_charset("text/plain; charset=ISO-8859-1"),
Some("iso-8859-1".to_string())
);
assert_eq!(parse_charset("text/plain"), None);
assert_eq!(parse_charset("application/octet-stream"), None);
assert_eq!(
parse_charset("text/html;charset=utf-8"),
Some("utf-8".to_string())
); assert_eq!(
parse_charset("text/html; charset=UTF-8"),
Some("utf-8".to_string())
); assert_eq!(parse_charset(""), None);
assert_eq!(
parse_charset("text/html; charset=utf-8; boundary=something"),
Some("utf-8".to_string())
);
assert_eq!(
parse_charset("text/html; charset=\"utf-8\""),
Some("utf-8".to_string())
);
assert_eq!(
parse_charset("text/html; charset='iso-8859-1'"),
Some("iso-8859-1".to_string())
);
assert_eq!(
parse_charset("text/html; Charset=UTF-8"),
Some("utf-8".to_string())
);
assert_eq!(
parse_charset("text/html; CHARSET=windows-1252"),
Some("windows-1252".to_string())
);
assert_eq!(
parse_charset("text/html; Charset=\"UTF-8\""),
Some("utf-8".to_string())
);
}
#[tokio::test]
async fn test_get_resource_with_encoding_from_metadata() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let mut metadata = HashMap::new();
metadata.insert("encoding".to_string(), serde_json::json!("utf-16"));
let resource = ResourceContent {
info: ResourceInfo {
uri: "test://encoded.txt".to_string(),
name: Some("encoded.txt".to_string()),
description: Some("Test resource with encoding".to_string()),
mime_type: Some("text/plain".to_string()),
metadata,
},
data: b"Hello, World!".to_vec(),
encoding: Some("utf-16".to_string()),
};
cache.store_resource(&resource).await.unwrap();
let retrieved = cache.get_resource("test://encoded.txt").await.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(retrieved_resource.encoding, Some("utf-16".to_string()));
}
#[tokio::test]
async fn test_get_resource_with_encoding_from_content_type() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = ResourceContent {
info: ResourceInfo {
uri: "test://charset.html".to_string(),
name: Some("charset.html".to_string()),
description: Some("Test resource with charset in content type".to_string()),
mime_type: Some("text/html; charset=iso-8859-1".to_string()),
metadata: HashMap::new(),
},
data: b"<html>Hello</html>".to_vec(),
encoding: None, };
cache.store_resource(&resource).await.unwrap();
let retrieved = cache.get_resource("test://charset.html").await.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(retrieved_resource.encoding, Some("iso-8859-1".to_string()));
}
#[tokio::test]
async fn test_store_and_retrieve_with_encoding() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = ResourceContent {
info: ResourceInfo {
uri: "test://utf8.txt".to_string(),
name: Some("utf8.txt".to_string()),
description: Some("UTF-8 encoded text".to_string()),
mime_type: Some("text/plain".to_string()),
metadata: HashMap::new(),
},
data: "Hello, 世界! 🌍".as_bytes().to_vec(),
encoding: Some("utf-8".to_string()),
};
cache.store_resource(&resource).await.unwrap();
let retrieved = cache.get_resource("test://utf8.txt").await.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(retrieved_resource.encoding, Some("utf-8".to_string()));
assert_eq!(retrieved_resource.data, "Hello, 世界! 🌍".as_bytes());
}
#[tokio::test]
async fn test_round_trip_encoding_with_quoted_charset() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = ResourceContent {
info: ResourceInfo {
uri: "test://quoted-charset.html".to_string(),
name: Some("quoted-charset.html".to_string()),
description: Some("HTML with quoted charset".to_string()),
mime_type: Some("text/html; charset=\"windows-1252\"".to_string()),
metadata: HashMap::new(),
},
data: b"<html>Content with special chars</html>".to_vec(),
encoding: None, };
cache.store_resource(&resource).await.unwrap();
let retrieved = cache
.get_resource("test://quoted-charset.html")
.await
.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(
retrieved_resource.encoding,
Some("windows-1252".to_string())
);
}
#[tokio::test]
async fn test_round_trip_encoding_with_case_insensitive_charset() {
let (config, _temp_dir) = create_test_cache_config();
let mut cache = ResourceCache::new(config).await.unwrap();
let resource = ResourceContent {
info: ResourceInfo {
uri: "test://uppercase-charset.xml".to_string(),
name: Some("uppercase-charset.xml".to_string()),
description: Some("XML with uppercase Charset".to_string()),
mime_type: Some("application/xml; Charset=UTF-8".to_string()),
metadata: HashMap::new(),
},
data: b"<?xml version=\"1.0\"?><root>data</root>".to_vec(),
encoding: None, };
cache.store_resource(&resource).await.unwrap();
let retrieved = cache
.get_resource("test://uppercase-charset.xml")
.await
.unwrap();
assert!(retrieved.is_some());
let retrieved_resource = retrieved.unwrap();
assert_eq!(retrieved_resource.encoding, Some("utf-8".to_string()));
}
#[test]
fn test_analytics_hit_rate_calculation_safety() {
let analytics = CacheAnalytics {
total_requests: 0,
cache_hits: 0,
cache_misses: 0,
hit_rate: 0.0,
cache_size_bytes: 0,
resource_count: 0,
eviction_count: 0,
last_cleanup: Utc::now(),
};
let hit_rate = if analytics.total_requests > 0 {
analytics.cache_hits as f64 / analytics.total_requests as f64
} else {
0.0
};
assert_eq!(hit_rate, 0.0);
}
#[tokio::test]
async fn test_migration_system_and_connection_pool() {
use std::time::Duration;
let temp_file = tempfile::NamedTempFile::new().unwrap();
let config = CacheConfig {
database_path: temp_file.path().to_string_lossy().to_string(),
default_ttl: Duration::from_secs(60),
max_size_mb: 100,
auto_cleanup: true,
cleanup_interval: Duration::from_secs(30),
pool_min_connections: Some(2),
pool_max_connections: Some(4),
pool_connection_timeout: Some(Duration::from_secs(5)),
pool_max_lifetime: Some(Duration::from_secs(300)),
};
let mut cache = ResourceCache::new(config).await.unwrap();
let test_resource = ResourceContent {
info: ResourceInfo {
uri: "test://migration/verification".to_string(),
name: Some("Migration Test".to_string()),
description: Some("Verify migration + pool work together".to_string()),
mime_type: Some("text/plain".to_string()),
metadata: std::collections::HashMap::new(),
},
data: b"migration test data".to_vec(),
encoding: None,
};
let _id = cache.store_resource(&test_resource).await.unwrap();
let retrieved = cache.get_resource(&test_resource.info.uri).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().data, test_resource.data);
let analytics = cache.get_analytics();
assert_eq!(analytics.total_requests, 1);
for i in 0..5 {
let uri = format!("test://pool/resource{}", i);
let result = cache.get_resource(&uri).await;
assert!(result.is_ok()); }
println!("Migration system and connection pool integration test passed");
}
}