use do_memory_core::{ComplexityLevel, Episode, StorageBackend, TaskContext, TaskType};
use do_memory_storage_turso::{AdvancedQueryCache, CacheConfig, QueryKey, TursoStorage};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tempfile::tempdir;
use uuid::Uuid;
fn create_test_episode(id: Uuid, domain: &str) -> Episode {
let mut metadata = HashMap::new();
metadata.insert("test_key".to_string(), "test_value".to_string());
metadata.insert("domain".to_string(), domain.to_string());
Episode {
episode_id: id,
task_type: TaskType::CodeGeneration,
task_description: format!("Test task {}", id),
context: TaskContext {
language: Some("rust".to_string()),
framework: None,
complexity: ComplexityLevel::Moderate,
domain: domain.to_string(),
tags: Vec::new(),
},
start_time: chrono::Utc::now(),
end_time: Some(chrono::Utc::now()),
steps: vec![],
outcome: None,
reward: None,
reflection: None,
patterns: vec![],
heuristics: vec![],
applied_patterns: Vec::new(),
salient_features: None,
metadata,
tags: vec![],
checkpoints: vec![],
}
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_cache_first_read_strategy() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db_url = format!("file:{}", db_path.display());
let storage = TursoStorage::new(&db_url, "").await.unwrap();
storage.initialize_schema().await.unwrap();
let cache_config = CacheConfig {
enable_episode_cache: true,
max_episodes: 1000,
..Default::default()
};
let cached_storage = storage.with_cache(cache_config);
let episodes: Vec<Episode> = (0..10)
.map(|_| create_test_episode(Uuid::new_v4(), "cache_test"))
.collect();
for episode in &episodes {
cached_storage
.store_episode(episode)
.await
.expect("Failed to store episode");
}
let start = Instant::now();
for episode in &episodes {
let result = cached_storage
.get_episode(episode.episode_id)
.await
.expect("Failed to get episode");
assert!(result.is_some());
}
let miss_duration = start.elapsed();
let start = Instant::now();
for episode in &episodes {
let result = cached_storage
.get_episode(episode.episode_id)
.await
.expect("Failed to get episode");
assert!(result.is_some());
}
let hit_duration = start.elapsed();
println!("Cache miss duration: {:?}", miss_duration);
println!("Cache hit duration: {:?}", hit_duration);
assert!(
hit_duration < miss_duration / 2,
"Cache hits should be significantly faster than misses"
);
let stats = cached_storage.stats();
assert!(stats.episode_hits > 0, "Should have cache hits");
assert!(
stats.episode_hit_rate() > 0.0,
"Hit rate should be positive"
);
println!("Cache hit rate: {:.1}%", stats.episode_hit_rate() * 100.0);
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_batch_operations() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db_url = format!("file:{}", db_path.display());
let storage = TursoStorage::new(&db_url, "").await.unwrap();
storage.initialize_schema().await.unwrap();
let episodes: Vec<Episode> = (0..50)
.map(|_| create_test_episode(Uuid::new_v4(), "batch_test"))
.collect();
let start = Instant::now();
for episode in &episodes[0..10] {
storage
.store_episode(episode)
.await
.expect("Failed to store episode");
}
let individual_duration = start.elapsed();
let start = Instant::now();
storage
.store_episodes_batch(episodes[10..20].to_vec())
.await
.expect("Failed to store batch");
let batch_duration = start.elapsed();
println!(
"Individual operations (10 episodes): {:?}",
individual_duration
);
println!("Batch operation (10 episodes): {:?}", batch_duration);
assert!(
batch_duration < individual_duration,
"Batch operations should be faster than individual operations"
);
let ids: Vec<Uuid> = episodes[10..20].iter().map(|e| e.episode_id).collect();
let retrieved = storage
.get_episodes_batch(&ids)
.await
.expect("Failed to get batch");
assert_eq!(retrieved.len(), 10, "Should retrieve all batched episodes");
assert!(
retrieved.iter().all(|e| e.is_some()),
"All episodes should exist"
);
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_query_result_caching() {
let (query_cache, _rx) = AdvancedQueryCache::new_with_receiver();
let episodes: Vec<Episode> = (0..5)
.map(|_| create_test_episode(Uuid::new_v4(), "query_cache_test"))
.collect();
let serialized = serde_json::to_vec(&episodes).unwrap();
let key1 = QueryKey::from_sql("SELECT * FROM episodes WHERE domain = 'test_domain'");
query_cache.put(key1.clone(), serialized.clone(), vec![]);
let cached = query_cache.get(&key1);
assert!(cached.is_some(), "Should get cached result");
let key2 = QueryKey::from_sql("SELECT * FROM episodes WHERE domain = 'other_domain'");
let not_cached = query_cache.get(&key2);
assert!(not_cached.is_none(), "Should be cache miss");
let stats = query_cache.stats();
assert_eq!(stats.hits, 1, "Should have 1 hit");
assert_eq!(stats.misses, 1, "Should have 1 miss");
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_query_cache_expiration() {
let (query_cache, _rx) = AdvancedQueryCache::new_with_receiver();
let episodes: Vec<Episode> = (0..3)
.map(|_| create_test_episode(Uuid::new_v4(), "expiration_test"))
.collect();
let serialized = serde_json::to_vec(&episodes).unwrap();
let key = QueryKey::from_sql("SELECT * FROM episodes WHERE domain = 'test'");
query_cache.put(key.clone(), serialized, vec![]);
assert!(query_cache.get(&key).is_some());
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_metadata_query_optimization() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db_url = format!("file:{}", db_path.display());
let storage = TursoStorage::new(&db_url, "").await.unwrap();
storage.initialize_schema().await.unwrap();
let episodes: Vec<Episode> = (0..20)
.map(|i| {
let mut ep = create_test_episode(Uuid::new_v4(), "metadata_test");
ep.metadata
.insert("priority".to_string(), format!("p{}", i % 3));
ep
})
.collect();
for episode in &episodes {
storage.store_episode(episode).await.unwrap();
}
let start = Instant::now();
let results = storage
.query_episodes_by_metadata("priority", "p1", None)
.await
.expect("Failed to query by metadata");
let query_duration = start.elapsed();
println!("Metadata query duration: {:?}", query_duration);
println!("Found {} episodes with priority=p1", results.len());
assert!(
results.len() >= 5 && results.len() <= 8,
"Should find around 6-7 episodes with priority=p1"
);
assert!(
query_duration < Duration::from_millis(50),
"Optimized query should be fast"
);
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_end_to_end_optimization() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db_url = format!("file:{}", db_path.display());
let storage = TursoStorage::new(&db_url, "").await.unwrap();
storage.initialize_schema().await.unwrap();
let episodes: Vec<Episode> = (0..30)
.map(|_| create_test_episode(Uuid::new_v4(), "e2e_test"))
.collect();
let start = Instant::now();
storage
.store_episodes_batch(episodes.clone())
.await
.expect("Failed to batch store");
let batch_store_duration = start.elapsed();
println!("Batch store duration: {:?}", batch_store_duration);
let cache_config = CacheConfig::default();
let cached_storage = storage.with_cache(cache_config);
let start = Instant::now();
for episode in &episodes {
let result = cached_storage
.get_episode(episode.episode_id)
.await
.expect("Failed to get episode");
assert!(result.is_some());
}
let first_read_duration = start.elapsed();
println!(
"First read duration (cache miss): {:?}",
first_read_duration
);
let start = Instant::now();
for episode in &episodes {
let result = cached_storage
.get_episode(episode.episode_id)
.await
.expect("Failed to get episode");
assert!(result.is_some());
}
let second_read_duration = start.elapsed();
println!(
"Second read duration (cache hit): {:?}",
second_read_duration
);
let start = Instant::now();
let results = cached_storage
.query_episodes_by_metadata("domain", "e2e_test", Some(1000))
.await
.expect("Failed to query");
let query_duration = start.elapsed();
println!("Metadata query duration: {:?}", query_duration);
assert_eq!(results.len(), 30, "Should find all episodes");
assert!(
second_read_duration < first_read_duration,
"Cache hits should be faster than cache misses"
);
let stats = cached_storage.stats();
println!("Cache hit rate: {:.1}%", stats.episode_hit_rate() * 100.0);
assert!(stats.episode_hits > 0, "Should have cache hits");
}