use do_memory_storage_turso::pool::{AdaptiveConnectionPool, AdaptivePoolConfig, ConnectionId};
use do_memory_storage_turso::prepared::{PreparedCacheConfig, PreparedStatementCache};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
async fn create_test_pool() -> (AdaptiveConnectionPool, TempDir) {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let db = libsql::Builder::new_local(&db_path).build().await.unwrap();
let config = AdaptivePoolConfig {
min_connections: 2,
max_connections: 10,
scale_up_threshold: 0.8,
scale_down_threshold: 0.3,
scale_up_cooldown: Duration::from_secs(1),
scale_down_cooldown: Duration::from_secs(1),
scale_up_increment: 2,
scale_down_decrement: 2,
check_interval: Duration::from_secs(5),
};
let pool = AdaptiveConnectionPool::new_sync(Arc::new(db), config)
.await
.unwrap();
(pool, dir)
}
#[tokio::test]
async fn test_cache_cleanup_on_connection_return() {
let (pool, _dir) = create_test_pool().await;
let cache = Arc::new(PreparedStatementCache::new(10));
let cache_clone = Arc::clone(&cache);
pool.set_cleanup_callback(Arc::new(move |conn_id: ConnectionId| {
let cleared = cache_clone.clear_connection(conn_id);
tracing::info!("Cleared {} statements for connection {}", cleared, conn_id);
}));
{
let conn = pool.get().await.unwrap();
let conn_id = conn.connection_id();
for i in 0..5 {
let sql = format!("SELECT {}", i);
cache.record_miss(conn_id, &sql, 100);
}
assert_eq!(cache.connection_size(conn_id), 5);
assert_eq!(cache.connection_count(), 1);
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(cache.total_size(), 0);
assert_eq!(cache.connection_count(), 0);
}
#[tokio::test]
async fn test_cache_tracks_multiple_connections() {
let (pool, _dir) = create_test_pool().await;
let cache = Arc::new(PreparedStatementCache::new(10));
let cache_clone = Arc::clone(&cache);
pool.set_cleanup_callback(Arc::new(move |conn_id: ConnectionId| {
cache_clone.clear_connection(conn_id);
}));
let mut conn_ids = Vec::new();
{
let conn1 = pool.get().await.unwrap();
let id1 = conn1.connection_id();
conn_ids.push(id1);
cache.record_miss(id1, "SELECT 1", 100);
cache.record_miss(id1, "SELECT 2", 100);
let conn2 = pool.get().await.unwrap();
let id2 = conn2.connection_id();
conn_ids.push(id2);
cache.record_miss(id2, "SELECT 3", 100);
cache.record_miss(id2, "SELECT 4", 100);
assert_eq!(cache.connection_count(), 2);
assert_eq!(cache.connection_size(id1), 2);
assert_eq!(cache.connection_size(id2), 2);
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(cache.connection_count(), 0);
assert_eq!(cache.total_size(), 0);
}
#[tokio::test]
async fn test_cache_statistics_with_cleanup() {
let (pool, _dir) = create_test_pool().await;
let cache = Arc::new(PreparedStatementCache::new(10));
let cache_clone = Arc::clone(&cache);
pool.set_cleanup_callback(Arc::new(move |conn_id: ConnectionId| {
cache_clone.clear_connection(conn_id);
}));
{
let conn = pool.get().await.unwrap();
let conn_id = conn.connection_id();
cache.record_miss(conn_id, "SELECT 1", 100);
cache.record_miss(conn_id, "SELECT 2", 100);
cache.record_hit(conn_id, "SELECT 1");
cache.record_hit(conn_id, "SELECT 2");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let stats = cache.stats();
assert_eq!(stats.hits, 2);
assert_eq!(stats.misses, 2);
assert_eq!(stats.active_connections, 0); }
#[tokio::test]
async fn test_no_callback_registered() {
let (pool, _dir) = create_test_pool().await;
let cache = PreparedStatementCache::new(10);
let conn_id = {
let conn = pool.get().await.unwrap();
let id = conn.connection_id();
cache.record_miss(id, "SELECT 1", 100);
id
};
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(cache.connection_size(conn_id), 1);
}
#[tokio::test]
async fn test_callback_removal_during_runtime() {
let (pool, _dir) = create_test_pool().await;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
let cleanup_count = Arc::new(AtomicU64::new(0));
let cleanup_count_clone = Arc::clone(&cleanup_count);
pool.set_cleanup_callback(Arc::new(move |conn_id| {
cleanup_count_clone.fetch_add(1, Ordering::Relaxed);
}));
{
let _conn = pool.get().await.unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(cleanup_count.load(Ordering::Relaxed), 1);
pool.remove_cleanup_callback();
{
let _conn = pool.get().await.unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(cleanup_count.load(Ordering::Relaxed), 1); }