#![cfg(feature = "compression")]
#![allow(clippy::expect_used)]
use do_memory_core::embeddings::EmbeddingStorageBackend;
use do_memory_storage_turso::TursoStorage;
use uuid::Uuid;
async fn setup_storage_with_embeddings() -> TursoStorage {
let db_path = std::env::temp_dir().join(format!("compression-test-{}.db", Uuid::new_v4()));
let db_path_str = db_path.to_str().expect("Invalid path");
let db = libsql::Builder::new_local(db_path_str)
.build()
.await
.expect("Failed to create database");
let storage = TursoStorage::from_database(db).expect("Failed to create storage");
let conn = storage
.get_connection()
.await
.expect("Failed to get connection");
if let Err(e) = conn
.execute(
r#"
CREATE TABLE IF NOT EXISTS embeddings (
embedding_id TEXT PRIMARY KEY NOT NULL,
item_id TEXT NOT NULL,
item_type TEXT NOT NULL,
embedding_data TEXT NOT NULL,
dimension INTEGER NOT NULL,
model TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
)
"#,
(),
)
.await
{
panic!("Failed to create embeddings table: {}", e);
}
if let Err(e) = conn
.execute(
r#"
CREATE INDEX IF NOT EXISTS idx_embeddings_item
ON embeddings(item_id, item_type)
"#,
(),
)
.await
{
panic!("Failed to create embeddings index: {}", e);
}
let check_result = conn
.query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='embeddings'",
(),
)
.await;
match check_result {
Ok(mut rows) => {
if rows.next().await.transpose().is_none() {
panic!("Table 'embeddings' was not created!");
}
}
Err(e) => {
panic!("Failed to verify table creation: {}", e);
}
}
storage
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_compression_enabled_by_default() {
let temp_file = tempfile::NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().expect("Invalid path");
let db = libsql::Builder::new_local(db_path)
.build()
.await
.expect("Failed to create database");
let storage = TursoStorage::from_database(db).expect("Failed to create storage");
let stats = storage.compression_statistics();
assert_eq!(stats.total_original_bytes, 0);
assert_eq!(stats.total_compressed_bytes, 0);
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_small_embedding_not_compressed() {
let storage = setup_storage_with_embeddings().await;
let small_embedding: Vec<f32> = (0..384).map(|i| i as f32 / 384.0).collect();
let episode_id = Uuid::new_v4();
storage
.store_episode_embedding(episode_id, small_embedding.clone())
.await
.expect("Failed to store small embedding");
let stats = storage.compression_statistics();
assert_eq!(stats.compression_count + stats.skipped_count, 1);
let retrieved = storage
.get_episode_embedding(episode_id)
.await
.expect("Failed to retrieve small embedding");
assert_eq!(retrieved, Some(small_embedding));
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_large_embedding_compressed() {
let storage = setup_storage_with_embeddings().await;
let large_embedding: Vec<f32> = (0..1536).map(|i| i as f32 / 1536.0).collect();
let episode_id = Uuid::new_v4();
storage
.store_episode_embedding(episode_id, large_embedding.clone())
.await
.expect("Failed to store large embedding");
let stats = storage.compression_statistics();
assert_eq!(stats.compression_count, 1);
assert_eq!(stats.skipped_count, 0);
let bandwidth_savings = stats.bandwidth_savings_percent();
assert!(
bandwidth_savings >= 30.0,
"Expected >= 30% bandwidth reduction, got {}%",
bandwidth_savings
);
println!("✅ Compression ratio: {:.2}", stats.compression_ratio());
println!("✅ Bandwidth savings: {:.1}%", bandwidth_savings);
let retrieved = storage
.get_episode_embedding(episode_id)
.await
.expect("Failed to retrieve large embedding");
assert_eq!(retrieved, Some(large_embedding));
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_compression_ratio_calculation() {
let storage = setup_storage_with_embeddings().await;
for i in 0..5 {
let embedding: Vec<f32> = (0..1536).map(|j| (i * 1536 + j) as f32).collect();
let episode_id = Uuid::new_v4();
storage
.store_episode_embedding(episode_id, embedding)
.await
.expect("Failed to store embedding");
}
let stats = storage.compression_statistics();
assert_eq!(stats.compression_count, 5);
assert!(stats.compression_ratio() < 1.0);
assert!(stats.compression_ratio() > 0.0);
assert!(stats.bandwidth_savings_percent() > 0.0);
println!("✅ Compression ratio: {:.2}", stats.compression_ratio());
println!(
"✅ Bandwidth savings: {:.1}%",
stats.bandwidth_savings_percent()
);
}
#[tokio::test]
#[ignore = "Memory corruption bug in libsql native library - malloc_consolidate() unaligned fastbin chunk in CI"]
async fn test_reset_compression_statistics() {
let storage = setup_storage_with_embeddings().await;
let embedding: Vec<f32> = (0..1536).map(|i| i as f32).collect();
let episode_id = Uuid::new_v4();
storage
.store_episode_embedding(episode_id, embedding)
.await
.expect("Failed to store embedding");
let stats = storage.compression_statistics();
assert!(stats.compression_count > 0);
storage.reset_compression_statistics();
let stats = storage.compression_statistics();
assert_eq!(stats.compression_count, 0);
assert_eq!(stats.total_original_bytes, 0);
assert_eq!(stats.total_compressed_bytes, 0);
}